### 1. Installation

Download and install Apache Spark 3.2.1, along with its dependencies (Java Development Kit 8+)

In [1]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
!cp /content/drive/MyDrive/DS/spark-3.2.3-bin-hadoop3.2.tgz /content/

In [3]:
# Run below commands in google colab
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.0.0
#!wget -q https://archive.apache.org/dist/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz
# unzip it
!tar xf spark-3.2.3-bin-hadoop3.2.tgz
# install findspark



In [4]:
!pip install -q findspark

Set Environment Variables to make the Spark runtime visible to Linux OS. Please, note that you can manage multiple versions of spark by pointing to the correct version through environment variables. Run below set of commands to point to Apache Spark 3.2.1 version downloaded earlier.

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"

Test the installation worked successfully.

In [6]:
!pip install google-api-python-client

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [7]:
!pip install tweetnlp

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tweetnlp
  Downloading tweetnlp-0.4.3.tar.gz (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.3/54.3 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting ray[tune] (from tweetnlp)
  Downloading ray-2.5.0-cp310-cp310-manylinux2014_x86_64.whl (56.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.2/56.2 MB[0m [31m10.5 MB/s[0m eta [36m0:00:00[0m
Collecting urlextract (from tweetnlp)
  Downloading urlextract-1.8.0-py3-none-any.whl (21 kB)
Collecting transformers<=4.21.2 (from tweetnlp)
  Downloading transformers-4.21.2-py3-none-any.whl (4.7 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.7/4.7 MB[0m [31m56.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting huggingface-hub<=0.9.1 (from tweetnlp)
  Downloading huggingface_hub-0.9.1-py3-none-any.whl

In [8]:
import findspark
findspark.init()

In [9]:
# Verify the Spark version running on the virtual cluster
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

assert  "3." in sc.version, "Verify that the cluster Spark's version is 3.x"
print("Spark version:", sc.version)
print(sc)

from pyspark.sql import SparkSession
spark = SparkSession(sc)
print(spark)

Spark version: 3.2.3
<SparkContext master=local[*] appName=pyspark-shell>
<pyspark.sql.session.SparkSession object at 0x7fe9943e0670>


In [10]:
import pandas as pd
#Caricamento dei file csv su spark
data_paths = ["/content/drive/MyDrive/DS/2020-6.csv","/content/drive/MyDrive/DS/2020-7.csv","/content/drive/MyDrive/DS/2020-8.csv","/content/drive/MyDrive/DS/2020-9.csv","/content/drive/MyDrive/DS/2020-10.csv","/content/drive/MyDrive/DS/2020-11.csv","/content/drive/MyDrive/DS/2020-12.csv",]

tweets = spark.read \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("quote", "\"") \
  .option("escape", "\"") \
  .csv(data_paths, sep=',', multiLine=True)

tweets.printSchema()

root
 |-- tweetid: long (nullable = true)
 |-- userid: long (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- description: string (nullable = true)
 |-- text: string (nullable = true)
 |-- reply_userid: long (nullable = true)
 |-- reply_screen: string (nullable = true)
 |-- reply_statusid: double (nullable = true)
 |-- tweet_type: string (nullable = true)
 |-- friends_count: integer (nullable = true)
 |-- listed_count: integer (nullable = true)
 |-- followers_count: integer (nullable = true)
 |-- favourites_count: integer (nullable = true)
 |-- statuses_count: integer (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- hashtag: string (nullable = true)
 |-- urls_list: string (nullable = true)
 |-- profile_pic_url: string (nullable = true)
 |-- profile_banner_url: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- date_first_tweet: string (nullable = true)
 |-- acco

In [None]:
tweets.rdd.getNumPartitions()

7

In [None]:
tweets.select("tweet_type").distinct().show()

+--------------------+
|          tweet_type|
+--------------------+
|        quoted_tweet|
|            original|
|retweeted_tweet_w...|
|               reply|
+--------------------+



In [11]:
from pyspark.sql.functions import col, count, date_format, date_sub, date_trunc, month, next_day, to_timestamp, weekofyear, window, year
from pyspark.sql.functions import explode, expr, substring_index, avg,col, asc,desc, when

#name1 = "S_Devenish"
#name = "AdamOctavius"
name1 = "JimBonz"
path_query = f"/content/drive/MyDrive/DS/Query/{name1}/"
attrib = [
    "account_creation_date","date","description","followers_count","hashtag","lang","mentionsn","profile_pic_url",
    "qtd_screen","qtd_urls_list","reply_screen","rt_screen","rt_urls_list","screen_name","text","tweet_type",
    "tweetid","urls_list","verified"
]
# Filtro i tweets, ottengo solo quelli appartenenti all'utente specificato
name1_dataframe = tweets.select(*[attrib]).where(tweets.screen_name == name1)
print(name1_dataframe.count())

2077


In [None]:
#Creo la sottocartella con il nome dell'utente specificato
#dove verranno salvati i risultati delle query sotto forma di CSV
import os.path
from os import path
if path.exists(path_query) == False:
  os.mkdir(path_query)


In [None]:
# Carico su spark il csv contenente user-cat, cat = MYT\NMYT
userids = spark.read \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("quote", "\"") \
  .option("escape", "\"") \
  .csv("/content/drive/MyDrive/DS/userids.csv", sep=',', multiLine=True)

userids.printSchema()

root
 |-- user: string (nullable = true)
 |-- cat: string (nullable = true)



In [None]:
#Query0: estrazioni dati utente base
from pyspark.sql.functions import col, lit

query0 = name1_dataframe.select('screen_name', "verified", "description", "followers_count", "account_creation_date", "profile_pic_url")\
    .distinct()\
    .orderBy(desc('followers_count')) \
    .limit(1)

#Estraggo categoria utente
user_cat = userids.filter(col("user") == name1).collect()[0]['cat']

# Aggiungi la colonna cat
query0_with_cat = query0.withColumn('cat', lit(user_cat))

# Mostra il risultato
query0_with_cat.show()

query0_with_cat.toPandas().to_csv(f"{path_query}query0.csv", index=True)

+-----------+--------+--------------------+---------------+---------------------+--------------------+----+
|screen_name|verified|         description|followers_count|account_creation_date|     profile_pic_url| cat|
+-----------+--------+--------------------+---------------+---------------------+--------------------+----+
|    JimBonz|   false|Electronica - Int...|          26299| Sat Feb 28 04:28:...|http://pbs.twimg....|NMYT|
+-----------+--------+--------------------+---------------+---------------------+--------------------+----+



In [None]:
#Istogramma #Tweet giorno-ora
#Dal campo data si creano i due campi day_of_week ed hour, raggruppo su questi due campi e faccio count
#si ottiene il numero di tweet per ogni giorno-ora
query1 = name1_dataframe.select("date").withColumn('day_of_week', date_format('date', 'EEEE')) \
       .withColumn('hour', date_format('date', 'HH')).groupBy("day_of_week","hour").count().sort("day_of_week","hour")
query1.show(1000)


query1.toPandas().to_csv(f"{path_query}query1.csv", index=True)



+-----------+----+-----+
|day_of_week|hour|count|
+-----------+----+-----+
|     Friday|  00|   15|
|     Friday|  01|   20|
|     Friday|  02|   16|
|     Friday|  03|   15|
|     Friday|  04|   19|
|     Friday|  05|   22|
|     Friday|  06|   20|
|     Friday|  07|   13|
|     Friday|  08|   29|
|     Friday|  09|   15|
|     Friday|  10|    3|
|     Friday|  11|    3|
|     Friday|  12|    1|
|     Friday|  16|    5|
|     Friday|  17|   11|
|     Friday|  18|   18|
|     Friday|  19|   19|
|     Friday|  20|   17|
|     Friday|  21|   10|
|     Friday|  22|   14|
|     Friday|  23|   10|
|     Monday|  00|    8|
|     Monday|  01|   18|
|     Monday|  02|   18|
|     Monday|  03|   21|
|     Monday|  04|   19|
|     Monday|  05|   15|
|     Monday|  06|   10|
|     Monday|  07|   21|
|     Monday|  08|   17|
|     Monday|  09|    7|
|     Monday|  13|    2|
|     Monday|  15|    2|
|     Monday|  16|    4|
|     Monday|  17|   21|
|     Monday|  18|   18|
|     Monday|  19|   22|


In [None]:
#Isrogramma #Tweet per giorno della settimana
query2 = name1_dataframe.select("date").withColumn('day_of_week', date_format('date', 'EEEE')).groupBy("day_of_week",).count().sort("day_of_week",)
query2.show()
query2.toPandas().to_csv(f"{path_query}query2.csv", index=True)

+-----------+-----+
|day_of_week|count|
+-----------+-----+
|     Friday|  295|
|     Monday|  276|
|   Saturday|  275|
|     Sunday|  232|
|   Thursday|  302|
|    Tuesday|  379|
|  Wednesday|  318|
+-----------+-----+



In [None]:
#Isrogramma #Tweet per mese
query3 = name1_dataframe.select("date").withColumn('month', date_format('date', 'MMMM')).groupBy("month",).count().sort("month",)
query3.show()
query3.toPandas().to_csv(f"{path_query}query3.csv", index=True)

+---------+-----+
|    month|count|
+---------+-----+
|   August|  139|
| December|  420|
|     July|  113|
|     June|   49|
| November|  425|
|  October|  541|
|September|  390|
+---------+-----+



In [None]:
#Istogramma count tweet type
query4 = name1_dataframe.select("tweet_type").groupBy("tweet_type").count().sort("count")
query4.show()
query4.toPandas().to_csv(f"{path_query}query4.csv", index=True)

+--------------------+-----+
|          tweet_type|count|
+--------------------+-----+
|               reply|   85|
|        quoted_tweet|  296|
|            original|  302|
|retweeted_tweet_w...| 1394|
+--------------------+-----+



In [None]:
#Istogramma count lingua dei tweet
query5 = name1_dataframe.select("lang").groupBy("lang").count().sort("count")
query5.show()
query5.toPandas().to_csv(f"{path_query}query5.csv", index=True)

+----+-----+
|lang|count|
+----+-----+
|  de|    1|
|  tl|    1|
|  et|    1|
|  es|    3|
|  fr|    5|
| und|   50|
|  en| 2016|
+----+-----+



In [None]:
#Conteggio domini condivisi nei tweet
from pyspark.sql.functions import from_json, explode, expr, substring_index, count
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType

# Definisci lo schema dei dati JSON
schema = ArrayType(
    StructType([
        StructField('url', StringType(), True),
        StructField('expanded_url', StringType(), True),
        StructField('display_url', StringType(), True),
        StructField('indices', ArrayType(IntegerType()), True)
    ])
)

#I link possono essere presenti in uno dei 3 campi in base al tipo di tweet

# Applica la funzione from_json per analizzare la colonna urls_list
df = name1_dataframe.select("urls_list","rt_urls_list","qtd_urls_list","tweet_type").withColumn('urls_data', from_json('urls_list', schema))
# Applica la funzione from_json per analizzare la colonna rt_urls_list
df = df.withColumn('rt_urls_data', from_json('rt_urls_list', schema))
# Applica la funzione from_json per analizzare la colonna rt_urls_list
df = df.withColumn('qtd_urls_data', from_json('qtd_urls_list', schema))
#Con la funzione from_json si trasforma un campo di tipo string con struttura json
#in un campo di tipo dizionario

# Estrai gli URL e i domini in base al campo tweet_type
df = df.withColumn('url_info', \
        when(col('tweet_type').isin('original', 'reply'), col('urls_data')) \
       .when(col('tweet_type').isin('quoted_tweet'), col('qtd_urls_data'))
       .otherwise(col('rt_urls_data')) ) \
       .withColumn('url_info', explode('url_info')) \
       .withColumn('url', expr('url_info.expanded_url')) \
       .withColumn('domain', substring_index(substring_index('url', '//', -1), '/', 1))

# Esegui una groupby count per contare le occorrenze dei domini
result = df.groupBy('domain').agg(count('*').alias('count'))
result = result.orderBy(col('count').desc())
#result.show(truncate=False)
query6 = result
query6.show(truncate=False)
query6.toPandas().to_csv(f"{path_query}query6.csv", index=True)





+-------------------------+-----+
|domain                   |count|
+-------------------------+-----+
|youtu.be                 |74   |
|www.nytimes.com          |52   |
|www.washingtonpost.com   |50   |
|www.rawstory.com         |41   |
|www.cnn.com              |32   |
|www.politico.com         |28   |
|cnn.it                   |26   |
|trib.al                  |23   |
|wapo.st                  |19   |
|www.citizensforethics.org|18   |
|www.nbcnews.com          |18   |
|nyti.ms                  |17   |
|www.forbes.com           |17   |
|twitter.com              |17   |
|www.axios.com            |16   |
|www.thedailybeast.com    |14   |
|www.theguardian.com      |13   |
|www.politicususa.com     |13   |
|www.huffpost.com         |13   |
|www.businessinsider.com  |11   |
+-------------------------+-----+
only showing top 20 rows



In [None]:
#Conteggio hashtag usati nei tweet

from pyspark.sql.functions import expr, explode, split, count
# ['WalkAwayFromDemocrats', 'MAGA', 'KAG', 'Trump2020']

# Hashtag è tipo string, rimuove i caratteri '[' e ']'
df = name1_dataframe.select("hashtag").withColumn('hashtag', expr("substring(hashtag, 3, length(hashtag)-4)"))

# Viene fatto lo split per generare una lista
df = df.withColumn('hashtag_array', split('hashtag', "', '"))

# Viene fatto explode su hashtag_array cosi da replicare ogni riga per quanti sono
# gli elem della lista hashtag_array
df = df.withColumn('hashtag', explode('hashtag_array'))

# Filtra le righe in cui il campo "hashtag" non è vuoto
df = df.filter(col("hashtag") != "")

# Esegui una groupby count sul singolo elemento della lista "hashtag"
result = df.groupBy('hashtag').agg(count('*').alias('count')).orderBy(desc('count'))


query7 = result
query7.show(truncate=False)
query7.toPandas().to_csv(f"{path_query}query7.csv", index=True)

+-----------------------+-----+
|hashtag                |count|
+-----------------------+-----+
|COVID19                |11   |
|VOTE                   |6    |
|Trump                  |6    |
|TraitorTrump           |5    |
|TrumpVirus             |5    |
|TrumpIsACompleteFailure|4    |
|coronavirus            |4    |
|BREAKING               |4    |
|TrumpIsALaughingStock  |4    |
|FBR                    |4    |
|TheResistanceIsGrowing |3    |
|TrumpRally             |3    |
|DiaperDon              |3    |
|BidenHarris2020        |3    |
|FollowBackResister     |3    |
|ResistanceArmy         |3    |
|TrumpMeltdown          |3    |
|StopTrumpsTerror       |3    |
|Vote                   |3    |
|VoteBlue               |3    |
+-----------------------+-----+
only showing top 20 rows



In [None]:
#Replied users
from pyspark.sql.functions import col

df = name1_dataframe.select("tweet_type","reply_screen").filter(name1_dataframe["tweet_type"] == "reply")

# Esegui una groupby count sul campo reply_screen, il quale contiene l'username
# dell'utente a cui si è risposto
result = df.groupBy('reply_screen').agg(count('*').alias('count')).orderBy(desc('count'))
#join tra la tabella reply_screen-count con la tabella userids, cosi da poter
#associare il campo cat ad ogni utente
joined_df = result.join(userids, col("reply_screen") == col("user"), "left").drop("user")

query8 = joined_df
query8.show(truncate=False)
query8.toPandas().to_csv(f"{path_query}query8.csv", index=True)

+---------------+-----+----+
|reply_screen   |count|cat |
+---------------+-----+----+
|realDonaldTrump|24   |null|
|JoeBiden       |5    |null|
|AnnMar43405922 |2    |null|
|funder         |2    |null|
|TaupinGloria   |2    |null|
|AZGOP          |1    |null|
|Lrihendry      |1    |null|
|honestelaine   |1    |null|
|donwinslow     |1    |null|
|Djane_Aileen   |1    |null|
|AlexanderSkitch|1    |null|
|RightWingWatch |1    |null|
|wendyphi       |1    |null|
|debdafoe       |1    |null|
|anenews        |1    |null|
|CNBC           |1    |null|
|Slate          |1    |null|
|nytopinion     |1    |null|
|str8outadurango|1    |null|
|Nulli_Secunda_3|1    |null|
+---------------+-----+----+
only showing top 20 rows



In [None]:
#Retweeted users

df = name1_dataframe.select("tweet_type","rt_screen").filter(name1_dataframe["tweet_type"] == "retweeted_tweet_without_comment")


# Esegui una groupby count sul campo rt_screen
result = df.groupBy('rt_screen').agg(count('*').alias('count')).orderBy(desc('count'))

joined_df = result.join(userids, col("rt_screen") == col("user"), "left").drop("user")

#joined_df.show(truncate=False)

query9 = joined_df
query9.show(truncate=False)
query9.toPandas().to_csv(f"{path_query}query9.csv", index=True)


+---------------+-----+----+
|rt_screen      |count|cat |
+---------------+-----+----+
|JoeBiden       |47   |null|
|mmpadellan     |32   |null|
|funder         |26   |null|
|CREWcrew       |25   |null|
|KamalaHarris   |23   |null|
|MeidasTouch    |23   |null|
|HKrassenstein  |23   |null|
|donwinslow     |20   |null|
|ProjectLincoln |15   |null|
|PalmerReport   |15   |null|
|TeaPainUSA     |14   |null|
|SethAbramson   |13   |null|
|SenKamalaHarris|13   |null|
|washingtonpost |12   |null|
|ReallyAmerican1|11   |null|
|TheDemCoalition|10   |null|
|chipfranklin   |10   |null|
|CNNPolitics    |9    |null|
|MSNBC          |9    |null|
|nytimes        |9    |null|
+---------------+-----+----+
only showing top 20 rows



In [None]:
#Quoted users

df = name1_dataframe.select("tweet_type","qtd_screen").filter(name1_dataframe["tweet_type"] == "quoted_tweet")


# Esegui una groupby count sul campo qtd_screen
result = df.groupBy('qtd_screen').agg(count('*').alias('count')).orderBy(desc('count'))

joined_df = result.join(userids, col("qtd_screen") == col("user"), "left").drop("user")

#joined_df.show(truncate=False)

query10 = joined_df
query10.show(truncate=False)
query10.toPandas().to_csv(f"{path_query}query10.csv", index=True)


+---------------+-----+----+
|qtd_screen     |count|cat |
+---------------+-----+----+
|kylegriffin1   |9    |null|
|atrupar        |8    |null|
|ProjectLincoln |8    |null|
|MeidasTouch    |7    |null|
|donwinslow     |6    |null|
|Acyn           |6    |null|
|JimBonz        |5    |NMYT|
|_HeatherWalker |5    |null|
|CNN            |4    |null|
|nytimes        |4    |null|
|JoeBiden       |4    |null|
|washingtonpost |4    |null|
|NBCNews        |3    |null|
|Reuters        |3    |null|
|ReallyAmerican1|3    |null|
|DrEricDing     |3    |null|
|marcorubio     |2    |null|
|joncoopertweets|2    |null|
|funder         |2    |null|
|jonathanvswan  |2    |null|
+---------------+-----+----+
only showing top 20 rows



In [None]:
import tweetnlp
model_e = tweetnlp.load_model('emotion')
model_o = tweetnlp.load_model('offensive')

Downloading config.json:   0%|          | 0.00/725 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/878k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/476M [00:00<?, ?B/s]

In [None]:
# Utenti più menzionati
# è diverso dalle query precedenti quoted, reply, retweed,
# poichè questa query considera i tag espliciti nei tweet

from pyspark.sql.functions import udf
from pyspark.sql.functions import from_json, explode, expr, substring_index, count, split


df = name1_dataframe.select("mentionsn")

# mentionsn è tipo string, rimuove i caratteri '[' e ']'
df = df.withColumn('mentionsn_', expr("substring(mentionsn, 3, length(mentionsn)-4)"))

# Separa la stringa "mentionsn" in un array di elementi
df = df.withColumn('mention_array', split('mentionsn_', "', '"))

# Viene fatto l'explode sul campo mention_array, cosi da replicare ogni row
# per quanti elementi ci sono nella lista mention_array
df = df.withColumn('mention', explode('mention_array'))

# Filtra le righe in cui il campo "mention" non è vuoto
df = df.filter(col("mention") != "")
# Viene effettuato il groupby sul campo mention e poi count
result = df.groupBy('mention').agg(count('*').alias('count')).orderBy(desc('count'))

query11 = result
query11.show(20, truncate=False)
query11.limit(20).toPandas().to_csv(f"{path_query}query11.csv", index=True)



+---------------+-----+
|mention        |count|
+---------------+-----+
|realDonaldTrump|273  |
|JoeBiden       |149  |
|YouTube        |65   |
|mmpadellan     |38   |
|KamalaHarris   |38   |
|funder         |32   |
|MeidasTouch    |31   |
|donwinslow     |30   |
|ProjectLincoln |28   |
|CREWcrew       |26   |
|HKrassenstein  |23   |
|TeaPainUSA     |19   |
|SethAbramson   |18   |
|CNN            |15   |
|PalmerReport   |15   |
|SenKamalaHarris|15   |
|washingtonpost |14   |
|ReallyAmerican1|14   |
|BarackObama    |13   |
|SenDuckworth   |12   |
+---------------+-----+
only showing top 20 rows



In [None]:
#Query 12
#Emotion, Offensive sui tweet delle 3 persone più menzionate (calcolate in query 11)
import pyspark.sql.functions as F

@udf
def emotion(text):
  return model_e.emotion(text)['label']
@udf
def offensive(text):
  return model_o.offensive(text)['label']

#NOTA mentionsn è ['a','b'], però è tipo stringa
df = name1_dataframe.select("text","mentionsn")
# * davanti alla lista fa l'unroll

most_ment = [e["mention"] for e in query11.head(3)]
print(most_ment)


['realDonaldTrump', 'JoeBiden', 'YouTube']


In [None]:
#Continuo Query 12, è svolta in python e non pyspark per problemi di memoria
lis = ["offensive", "non-offensive", 'anger','anticipation','disgust','fear','joy','love','optimism','pessimism','sadness','surprise','trust']
dic = {e:{l: 0 for l in lis} for e in most_ment}
print(dic)

#Estraggo i tweet dove sono menzionati in mentions almeno uno dei 3 utenti più menzionati dall'utente
#la riga di sotto serve per implementare any, cioè che dei 3 utenti basta che ne sia presente uno
#si utilizza F.greatest perchè restituisce True se è presente almeno un True nella lista
# * serve per fare unroll

df_mention_filter = name1_dataframe.select("text", "mentionsn") \
  .filter(F.greatest(*[F.col("mentionsn").contains(name) for name in most_ment]) == True)

# Itero sui risultati restituiti dalla query
for row in df_mention_filter.rdd.toLocalIterator():
  for m in most_ment:
    if m in row["mentionsn"]:
      off = model_o.offensive(row["text"])["label"]
      emo = model_e.emotion(row["text"])["label"]
      #print("%s %s %s" % (m, off, emo))
      dic[m][off] += 1
      dic[m][emo] += 1


{'realDonaldTrump': {'offensive': 0, 'non-offensive': 0, 'anger': 0, 'anticipation': 0, 'disgust': 0, 'fear': 0, 'joy': 0, 'love': 0, 'optimism': 0, 'pessimism': 0, 'sadness': 0, 'surprise': 0, 'trust': 0}, 'JoeBiden': {'offensive': 0, 'non-offensive': 0, 'anger': 0, 'anticipation': 0, 'disgust': 0, 'fear': 0, 'joy': 0, 'love': 0, 'optimism': 0, 'pessimism': 0, 'sadness': 0, 'surprise': 0, 'trust': 0}, 'YouTube': {'offensive': 0, 'non-offensive': 0, 'anger': 0, 'anticipation': 0, 'disgust': 0, 'fear': 0, 'joy': 0, 'love': 0, 'optimism': 0, 'pessimism': 0, 'sadness': 0, 'surprise': 0, 'trust': 0}}


In [None]:
print(dic)
import json
with open(f"{path_query}query12.json", "w") as outfile:
    json.dump(dic, outfile)

{'realDonaldTrump': {'offensive': 51, 'non-offensive': 220, 'anger': 199, 'anticipation': 7, 'disgust': 40, 'fear': 7, 'joy': 11, 'love': 0, 'optimism': 1, 'pessimism': 0, 'sadness': 6, 'surprise': 0, 'trust': 0}, 'JoeBiden': {'offensive': 2, 'non-offensive': 145, 'anger': 31, 'anticipation': 42, 'disgust': 13, 'fear': 0, 'joy': 28, 'love': 0, 'optimism': 29, 'pessimism': 0, 'sadness': 4, 'surprise': 0, 'trust': 0}, 'YouTube': {'offensive': 2, 'non-offensive': 63, 'anger': 33, 'anticipation': 10, 'disgust': 14, 'fear': 3, 'joy': 2, 'love': 0, 'optimism': 2, 'pessimism': 0, 'sadness': 1, 'surprise': 0, 'trust': 0}}


In [None]:
'''
#Query 12 implementata in Spark ma non va per la memoria
#Estraggo i tweet dove sono menzionati in mentions almeno uno dei 5 utenti più menzionati dall'utente
#la riga di sotto serve per implementare any, cioè che delle 5 condizioni ne basta almeno una True
#si utilizza F.greatest perchè restituisce True se è presente almeno un True nella lista
# * serve per fare unroll
df = df.filter(F.greatest(*[F.col("mentionsn").contains(name) for name in most_ment]) == True)
df.show(100, truncate = False)
df = df.withColumn('emotion', emotion(df["text"]))
df = df.withColumn('offensive', offensive(df["text"]))
df.show(20)
df = df.select( "mentionsn", "emotion", "offensive")
df = df.withColumn('mentionsn_', expr("substring(mentionsn, 3, length(mentionsn)-4)"))

# Separa la stringa "mentionsn" in un array di elementi
df = df.withColumn('mention_array', split('mentionsn_', "', '"))

# Espandi la colonna "mention_array" in righe separate
df = df.withColumn('mention', explode('mention_array'))
# Dato che ho fatto explode magari ho righe ripetute con un mention che però non è
# nei più menzionati, ex. mentionsn è [realDonaldTrump, a, b], dopo explode mi trovo 3 righe
# con lo stesso text, ma una con mention realDonaldTrump, una a, una b, per alleggerire il count voglio filtrare
df = df.filter(F.greatest(*[F.col("mention").contains(name) for name in most_ment]) == True)
df.show(20)
# Esegui una groupby count sul singolo elemento della lista "hashtag"

df_sel = df.select("mention", "emotion", "offensive")
result1 = df_sel.groupBy('mention', 'emotion').agg(count('*').alias('count')).orderBy(desc('count'))

#result.show(20)

result2 = df_sel.groupBy('mention', 'offensive').agg(count('*').alias('count')).orderBy(desc('count'))

#result.show(20)
'''

'\n#Query 12 implementata in Spark ma non va per la memoria\n#Estraggo i tweet dove sono menzionati in mentions almeno uno dei 5 utenti più menzionati dall\'utente\n#la riga di sotto serve per implementare any, cioè che delle 5 condizioni ne basta almeno una True\n#si utilizza F.greatest perchè restituisce True se è presente almeno un True nella lista\n# * serve per fare unroll\ndf = df.filter(F.greatest(*[F.col("mentionsn").contains(name) for name in most_ment]) == True)\ndf.show(100, truncate = False)\ndf = df.withColumn(\'emotion\', emotion(df["text"]))\ndf = df.withColumn(\'offensive\', offensive(df["text"]))\ndf.show(20)\ndf = df.select( "mentionsn", "emotion", "offensive")\ndf = df.withColumn(\'mentionsn_\', expr("substring(mentionsn, 3, length(mentionsn)-4)"))\n\n# Separa la stringa "mentionsn" in un array di elementi\ndf = df.withColumn(\'mention_array\', split(\'mentionsn_\', "\', \'"))\n\n# Espandi la colonna "mention_array" in righe separate\ndf = df.withColumn(\'mention\', 

In [None]:
#Tipo di cross posting yt, si vuole indagare come l'utente condivide video yt
# se retweetta, se sono post originali ecc

# Dai tweet vengono selezionati quelli che contengono un url youtube
# in almeno uno dei campi dove sono presenti link
query13 = name1_dataframe.select('urls_list', 'rt_urls_list', 'qtd_urls_list', 'tweet_type')\
    .filter( col('urls_list').contains('youtu') | (col('rt_urls_list').contains('youtu') | (col('qtd_urls_list').contains('youtu'))  )  )

# Group by per tipo di tweet
result = query13.groupBy('tweet_type')\
    .count()

result.show(truncate = False)
result.toPandas().to_csv(f"{path_query}query13.csv", index=True)


+-------------------------------+-----+
|tweet_type                     |count|
+-------------------------------+-----+
|original                       |72   |
|retweeted_tweet_without_comment|7    |
|quoted_tweet                   |2    |
+-------------------------------+-----+



In [None]:
#Roba kaggle

#Keras model

import tensorflow as tf
# import modello già addestrato
model = tf.keras.models.load_model('/content/drive/MyDrive/DS/Political Leaning Recognition/model.h5')
model.summary()

Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_2 (Dense)             (None, 64)                160064    
                                                                 
 dense_3 (Dense)             (None, 1)                 65        
                                                                 
Total params: 160,129
Trainable params: 160,129
Non-trainable params: 0
_________________________________________________________________


In [None]:
import pickle

# si importa un oggetto serializzato python
# utilizzato per adattare i dati al formato di input
# desiderato dalla rete

with open('/content/drive/MyDrive/DS/Political Leaning Recognition/vectorizer_data.pk1', 'rb') as f:
    saved_data = pickle.load(f)

vectorizer = saved_data['vectorizer']
data = saved_data['data']

vectorizer.fit(data['Tweet'])

In [None]:
#preprocessing functions
import re
import string

def features_preprocess (text):
    # Remove all the special characters
    processed_feature = re.sub(r'\W', ' ', str(text))

    # remove all single characters
    processed_feature= re.sub(r'\s+[a-zA-Z]\s+', ' ', processed_feature)

    # Remove single characters from the start
    processed_feature = re.sub(r'\^[a-zA-Z]\s+', ' ', processed_feature)

    # Substituting multiple spaces with single space
    processed_feature = re.sub(r'\s+', ' ', processed_feature, flags=re.I)

    # Removing prefixed 'b'
    processed_feature = re.sub(r'^b\s+', '', processed_feature)

    # Converting to Lowercase
    processed_feature = re.sub('https://t.co','',processed_feature)
    processed_feature = re.sub('https','',processed_feature)
    processed_feature = re.sub(' co ','',processed_feature)
    processed_feature = re.sub('amp','',processed_feature)


    processed_feature = processed_feature.lower()
    return processed_feature


def remove_punct(text):
    text  = "".join([char for char in text if char not in string.punctuation])
    text = re.sub('[0-9]+', '', text)
    return text


def preprocess_text(text):
    text = features_preprocess(text)
    text = remove_punct(text)  # Apply the existing preprocessing steps
    text = text.lower()  # Convert to lowercase
    return text

In [None]:
from traitlets import validate
#prediction pipeline

import numpy as np

def predict_class(val):
    print(val)
    val = np.round(val).astype(int)
    print(val)
    if val == 0:
        return 'Democrat', val
    elif val == 1:
        return 'Republican', val

def leaning(tweet):
    processed_new_text = preprocess_text(tweet)

    # Transform the processed new text using the fitted vectorizer
    new_text_vector = vectorizer.transform([processed_new_text]).toarray()

    pred_neur, val = predict_class(model.predict(new_text_vector))
    #pred_neur = label_encoder.inverse_transform(pred_neur)

    return pred_neur, val

In [None]:
import tweetnlp
model_s = tweetnlp.load_model('sentiment')
model_i = tweetnlp.load_model('irony')
model_e = tweetnlp.load_model('emotion')
model_t = tweetnlp.load_model('topic_classification')

Downloading config.json:   0%|          | 0.00/929 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/878k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/478M [00:00<?, ?B/s]

Some weights of the model checkpoint at cardiffnlp/twitter-roberta-base-sentiment-latest were not used when initializing RobertaForSequenceClassification: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
- This IS expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing RobertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Downloading config.json:   0%|          | 0.00/705 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/878k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/476M [00:00<?, ?B/s]

Downloading config.json:   0%|          | 0.00/1.17k [00:00<?, ?B/s]

Downloading tokenizer_config.json:   0%|          | 0.00/409 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/780k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/2.01M [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/280 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/476M [00:00<?, ?B/s]

Downloading config.json:   0%|          | 0.00/1.96k [00:00<?, ?B/s]

Downloading tokenizer_config.json:   0%|          | 0.00/354 [00:00<?, ?B/s]

Downloading vocab.json:   0%|          | 0.00/780k [00:00<?, ?B/s]

Downloading merges.txt:   0%|          | 0.00/446k [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/1.29M [00:00<?, ?B/s]

Downloading special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/476M [00:00<?, ?B/s]

In [None]:
#all_tweets = tweets.select("tweetid", "text").where(tweets.screen_name == name1)
all_tweets = name1_dataframe.select("tweetid", "text")
print(all_tweets.count())

2077


In [None]:
import csv
# Per ogni tweet dell'utente viene effettuata la sentiment analysis
# tramite l'utilizzo di TweetNLP
# il risultato è salvato in sentiment.csv

if os.path.exists(f'{path_query}sentiment.csv'):
    # Elimina il file se esiste già
    os.remove(f'{path_query}sentiment.csv')

with open(f'{path_query}sentiment.csv', 'a', newline='') as file:
    writer = csv.writer(file)

    writer.writerow(["tweetid", "sentiment", "irony", "emotion", "topic", "leaning"])

    #Itero su tutti i tweet dell'utente
    for row in all_tweets.rdd.toLocalIterator():
        s = row["text"]
        print(s)
        sent1 = model_s.sentiment(s)
        sent2 = model_i.irony(s)
        sent3 = model_e.emotion(s)
        sent4 = model_t.topic(s)
        #sent5 = model_ner.ner(s)
        sent6, val = leaning(s)
        print(sent1)
        print(sent2)
        print(sent3)
        print(sent4)
        #print(sent5)
        print(sent6)

        writer.writerow([row["tweetid"], sent1, sent2, sent3, sent4, val])

        print("-----------")

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
RT @politico: Education Secretary Betsy DeVos is under investigation for potentially violating the Hatch Act after she slammed Joe Biden in…
[[0.5209771]]
[[1]]
{'label': 'negative'}
{'label': 'irony'}
{'label': 'anger'}
{'label': ['news_&_social_concern']}
Republican
-----------
RT @CandiceAiston: "The American Dream belongs to ALL of us." ~Kamala Harris https://t.co/lJMwcseKLX
[[0.43297312]]
[[0]]
{'label': 'positive'}
{'label': 'non_irony'}
{'label': 'optimism'}
{'label': ['news_&_social_concern']}
Democrat
-----------
RT @JCTheResistance: JFC. ”officials were searching for crowd control technology deemed too unpredictable to use in war zones” yet Trump wa…
[[0.35595858]]
[[0]]
{'label': 'negative'}
{'label': 'irony'}
{'label': 'anger'}
{'label': ['news_&_social_concern']}
Democrat
-----------
RT @TomArnold: All those years of sucking up to rich white supremacists and Ann Coulter doesn't get one tax tip? https://t.co/nL

In [None]:
 # Carico su spark il risultato della query sentiment ottenuto in precedenza
 # il csv contiene la sentiment analysis per ogni video yt
sentiment_tweet = spark.read \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("quote", "\"") \
  .option("escape", "\"") \
  .csv(f'{path_query}sentiment.csv', sep=',', multiLine=True)

sentiment_tweet.printSchema()

root
 |-- tweetid: long (nullable = true)
 |-- sentiment: string (nullable = true)
 |-- irony: string (nullable = true)
 |-- emotion: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- leaning: string (nullable = true)



In [None]:
# query14 numero di video yt condividi e rimossi / numero video yt condividi in totale
# inoltre dal file sentiment.csv si filtrano i tweet contenenti link youtube e si
# calcolano le occorrenze dei sentimenti rilevati nei tweet

# Estraggo tutti i video yt

from pyspark.sql.functions import from_json, explode, expr, substring_index, count
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType

# Definisci lo schema dei dati JSON
schema = ArrayType(
    StructType([
        StructField('url', StringType(), True),
        StructField('expanded_url', StringType(), True),
        StructField('display_url', StringType(), True),
        StructField('indices', ArrayType(IntegerType()), True)
    ])
)
# I link possono essere presenti in tre campi diversi a seconda del tweet type

# Applica la funzione from_json per analizzare la colonna urls_list(reply, original)
df = name1_dataframe.select('urls_list', 'rt_urls_list', 'qtd_urls_list', 'tweet_type','tweetid') \
.withColumn('urls_data', from_json('urls_list', schema))
# Applica la funzione from_json per analizzare la colonna rt_urls_list(retweeted)
df = df.withColumn('rt_urls_data', from_json('rt_urls_list', schema))
# funz json per la colonna qtd_urls_list (quoted)
df = df.withColumn('qtd_urls_data', from_json('qtd_urls_list', schema))

# Estrai gli URL e i domini in base al campo tweet_type
# se il tweet_type è quoted allora i link sono in qtd_urls_data
# e cosi via
df = df.withColumn('url_info', \
        when(col('tweet_type').isin('original', 'reply'), col('urls_data')) \
       .when(col('tweet_type').isin('quoted_tweet'), col('qtd_urls_data'))
       .otherwise(col('rt_urls_data')) ) \
       .withColumn('url_info', explode('url_info')) \
       .withColumn('url', expr('url_info.expanded_url'))

#ottengo solo i tweet che contengono link youtube
df = df.filter( col('url').contains('youtu') )
# genero un dizionario con più elementi di tipo tweetid:url
yt_video_dict = {row['tweetid']:row['url'] for row in df.collect()}

#df = df.withColumn('domain', substring_index(substring_index('url', '//', -1), '/', 1))

# Esegui una groupby count per contare le occorrenze dei domini
n_total_video = df.agg(count('*').alias('count')).collect()[0]['count']
print(n_total_video)
print(yt_video_dict)



81
{1344107234821775361: 'https://www.youtube.com/watch?v=6Ag6qopowd0', 1338683974978179073: 'https://youtu.be/v4Auu_k7PRU', 1341894903991656448: 'https://youtu.be/BKG6kOXquSY', 1339795692043169792: 'https://youtu.be/yquoXL3_Eu4', 1339796569286037511: 'https://youtu.be/3v1njO2Z8mU', 1342279401807888385: 'https://www.youtube.com/watch?v=gDf8PoakKj4', 1341502263332761600: 'https://www.youtube.com/watch?v=7ppMpvL46bw', 1340482886827343872: 'https://youtu.be/UYwMf2N9rJE', 1337501428198207488: 'https://youtu.be/DZmyFid7FRg', 1339480692078493697: 'https://youtu.be/5e3Xn81boSQ', 1342041887339737088: 'https://youtu.be/X83hUR3PP14', 1339084512047730690: 'https://youtu.be/BxOZ4i7kL3E', 1339855305706856448: 'https://youtu.be/Bj1cRwDIs_8', 1341855829515849728: 'https://www.youtube.com/watch?v=-fQq8koZM7M&feature=youtu.be&ab_channel=CB', 1341862146125139968: 'https://www.youtube.com/watch?v=dsPVvLUVlOo', 1343832553719918592: 'https://www.rawstory.com/new-trump-campaign-ad---youtube/', 1336203652336

In [None]:
# continuo query14
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

def parseYoutubeURL(url):
   data = re.findall(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url)
   if data:
       return data[0]
   return ""

def is_video_removed(video_id, api_key):
    try:
        # Creare un'istanza dell'oggetto YouTube Data API
        youtube = build('youtube', 'v3', developerKey=api_key)

        #  richiesta API per ottenere i dettagli del video
        response = youtube.videos().list(
            part='status',
            id=video_id
        ).execute()
        l = len(response['items'])
        if(l==0): #rimosso
          return True
        # Verificare lo stato del video
        if response['items'][0]['status']['uploadStatus'] == 'removed':
            return True
        else:
            return False

    except HttpError as e:
        print('Errore durante la richiesta API:', e)
        return False

# La tua chiave API
api_key = 'AIzaSyAFGQaSWt4GS1D8xEtS2hpwYFwzL6sqU-U'
# L'obbiettivo
n_removed = 0
n_neutral = 0
n_positive = 0
n_negative = 0

# Itero sul dizionario dei video yt estratti in precedenza

for tweetid, video in yt_video_dict.items():
  v_id = parseYoutubeURL(video)

  sent = sentiment_tweet.select("sentiment").filter(col("tweetid")==tweetid).collect()[0]["sentiment"]
  if "neutral" in sent:
    n_neutral += 1
  elif "positive" in sent:
    n_positive += 1
  else:
    n_negative += 1
  # Verificare se il video è stato rimosso
  removed = is_video_removed(v_id, api_key)
  if removed:
      n_removed += 1

  #print(f"{tweetid}, removed: {removed}, sentiment:{sent}, url:{video}")

print(n_removed)
print(n_total_video)
print(n_removed/n_total_video)
print(f"pos: {n_positive}, neg: {n_negative}, neu: {n_neutral}")

2
81
0.024691358024691357
pos: 2, neg: 18, neu: 61


In [None]:
with open(f'{path_query}query14.csv', 'w') as f:
    f.write("n_removed,n_total_video,ratio,n_positive,n_neutral,n_negative\n")
    f.write(f"{n_removed},{n_total_video},{n_removed/n_total_video},{n_positive},{n_negative},{n_neutral}")