### Twitter hashtag trends per country for the Russo-Ukraine war.
##### https://www.kaggle.com/datasets/bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows?resource=download

##### Step 1: ensure we can Read/Write data

In [1]:
# Core dependencies
from pyspark.sql import SparkSession
from pyspark.sql.types import MapType, StringType
from pyspark.sql import functions as F
from matplotlib import cm
from wordcloud import WordCloud
from geopy.geocoders import Nominatim
from PIL import Image
import matplotlib.pyplot as plt
import pandas as pd
import geopandas as gpd
import numpy as np
import random, re
import json
import os


os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
pd.set_option('display.max_colwidth', None)
seed = random.seed(42)
FS_PORT = 9334
PARTITIONS = 600
APP_NAME = 'DA381A_Project'
PATH = f"hdfs://localhost:{FS_PORT}/project/*"
CUTOFF_LIM = 1000


In [2]:
# init spark session
spark = SparkSession.builder.appName(APP_NAME).config("spark.driver.memory", "16g").getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.databricks.io.cache.enabled", True) # delta caching
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True) # adaptive query execution for skewed data
spark.conf.set("spark.databricks.optimizer.rangeJoin.binSize", 20) #range optimizer

In [3]:
# Read the data from Hadoop into a DataFrame
df = spark.read.csv(path=f"{PATH}.csv", header=True, inferSchema=True)

In [4]:
df = df.repartition(PARTITIONS)
df.show(1)

+--------------------+--------------------+--------+--------+--------+--------------------+---------+-----------+-------------+-------+--------------+------------+----+--------+--------+-----------+--------------+-----------+
|                 _c0|              userid|username|acctdesc|location|           following|followers|totaltweets|usercreatedts|tweetid|tweetcreatedts|retweetcount|text|hashtags|language|coordinates|favorite_count|extractedts|
+--------------------+--------------------+--------+--------+--------+--------------------+---------+-----------+-------------+-------+--------------+------------+----+--------+--------+-----------+--------------+-----------+
|#Russie #Ukraine ...|[{'text': 'Poutin...|      fr|    null|       0|2022-04-01 07:25:...|     null|       null|         null|   null|          null|        null|null|    null|    null|       null|          null|       null|
+--------------------+--------------------+--------+--------+--------+--------------------+-----

#### We will continue now to preprocess the data to be left with the information required to build our graphs.

In [5]:
df.columns

['_c0',
 'userid',
 'username',
 'acctdesc',
 'location',
 'following',
 'followers',
 'totaltweets',
 'usercreatedts',
 'tweetid',
 'tweetcreatedts',
 'retweetcount',
 'text',
 'hashtags',
 'language',
 'coordinates',
 'favorite_count',
 'extractedts']

In [6]:
cols_to_keep = ('userid', 'hashtags')
df_filtered = df.select(*cols_to_keep)

##### As we are interested in the hashtags, we need to remove all tweets that do not have a hashtag and remove all non-dict hashtags

In [7]:
df_filtered = df_filtered.filter(F.col('hashtags').isNotNull() & ~F.col('hashtags').rlike(r"^[^{]+$")) # source: https://chat.openai.com

#### We should now retrieve only the text from the hashtag(s) used in the tweets

In [8]:
df2 = df_filtered
df2 = df2.withColumn('hashtags', F.split(F.regexp_replace("hashtags", r"(['\{\[\]\}\s:,]+)(text|indices|[0-9]+|')?(?!^[\p{L}]+$)", " "), " "))
df2 = df2.withColumn('hashtags', F.array_remove('hashtags',''))
df2 = df2.repartition(PARTITIONS)
df2.show()

+-------------------+--------------------+
|             userid|            hashtags|
+-------------------+--------------------+
|         1350490724|         [Ukrainian]|
|1437995794674638852|[SlavaUkraini, Fr...|
|1339977139391741953|[Italy, Ukraine️,...|
|          233973865|   [Ukraine, Russia]|
| 830065408170455040|   [Ukraine, WarDay]|
|          219802693|           [Salvini]|
|1496947213276221443|         [Anonymous]|
| 922217949318008835|     [Ukraine, Kyiv]|
|         1556455656|[disinformazione,...|
|1468608612134453248|[Ukraine, Russia,...|
|1504182118036951045|[UkraineWar, Tigr...|
|          110795440|           [Kherson]|
|          848832212|    [Macron, Russie]|
|         2967943317|           [Ukraine]|
|1503405304574271500|[EU, NATO, Butche...|
|            5734902|           [Ukraine]|
|          265809326|           [GRAMMYs]|
|          305526924|[Russia, Asia, Af...|
|          964964988|           [Ukraine]|
|          335369016|        [UkraineWar]|
+----------

#### get all unique userids, and join their hashtags

In [9]:
df3 = df2.groupBy('userid').agg(F.array_distinct(F.flatten(F.collect_list('hashtags'))).alias('hashtags'))
df3.repartition(PARTITIONS)
df3.show()

+--------------------+--------------------+
|              userid|            hashtags|
+--------------------+--------------------+
|                 #AI|           [Ukraine]|
|                #BRI|               [BRI]|
|            #Cuisine|            [Russie]|
|          #Hezbollah|              [Iran]|
|            #Hotwife|[Mariupol, Russia...|
|              #LGBTQ|  [StandWithUkraine]|
| #Nikolayev or #O...|         [Nikolayev]|
|    #President_Putin|            [madmen]|
|           #Wotblitz|[OmletArcade, Wor...|
| #gravity_propulsion|           [Ukraine]|
|            #musique|[Russie, UnfoldTh...|
|              #nazis|[Ukraine, BBC, Pr...|
| #președintele #P...|         [dictatori]|
|            #privacy|        [Nordstream]|
|           #sériesTV|   [ukraine, guerre]|
|             #trader|[phizer, VaccineS...|
|          #worldwar3|[Nuclear, War, WW...|
|        #டெரக்சாவின்|         [அமெரிக்கா]|
|               (City|             [Trump]|
|            0 Tweets|[Putin, Co

In [10]:
df3 = df3.filter(F.col('userid').rlike(r'^\d+$'))

In [11]:
df3 = df3.withColumn('hashtags', F.explode('hashtags')).withColumn('hashtags', F.trim(F.lower('hashtags')))
df3.show()
df3.repartition(PARTITIONS)

+-------------------+-------------+
|             userid|     hashtags|
+-------------------+-------------+
|1000010535423938560|   eurovision|
|1000033307969630211|        biden|
|1000033307969630211|    jameswebb|
|1000039842674364417|crimeanbridge|
|1000039842674364417|  fashionista|
|1000039842674364417|    braywyatt|
|1000048501210959872|     starlink|
|1000048501210959872|      ukraine|
|1000051190422495238|  vladikavkaz|
|1000051190422495238|       russia|
|1000051190422495238|     azovstal|
|1000051190422495238|     mariupol|
|1000059059532378112|         nato|
|1000082842364215296|      ukraine|
|1000085476097839104| fifaworldcup|
|1000085476097839104|      usavwal|
|1000089708406476802|   ukrainewar|
|1000102489490968576|      ukraine|
|1000129804333731840|      ukraine|
|1000129804333731840|papafrancesco|
+-------------------+-------------+
only showing top 20 rows



DataFrame[userid: string, hashtags: string]

### Flatten list of hashtags

In [12]:
df3 = df3.withColumn('hashtags', F.regexp_replace('hashtags', 'e️', 'e'))
df3 = df3.groupBy('hashtags').count().where(F.col('count') > CUTOFF_LIM).orderBy(F.desc('count'))
df3.repartition(PARTITIONS)
df3.show()

+----------------+------+
|        hashtags| count|
+----------------+------+
|         ukraine|837161|
|          russia|392753|
|           putin|277837|
|            nato|215889|
|        mariupol|173300|
|           biden|170790|
|standwithukraine|152878|
|         russian|129553|
|           bucha|114801|
|             usa|104222|
|        zelensky| 97234|
|       ukrainian| 96546|
|ukrainerussiawar| 93305|
|      eurovision| 90529|
|      ukrainewar| 90358|
|           china| 88881|
|          taiwan| 87510|
|    slavaukraini| 85305|
|              eu| 76389|
|      stoprussia| 73263|
+----------------+------+
only showing top 20 rows



In [13]:
df = df3.toPandas()

### Data Visualization

In [14]:
def output_country(country_name='World'):
    # try:
    global df
    try:
        os.mkdir(f'output/{country_name}')
    except IOError:
        print(f'{country_name} is already written to. Skipping...')
        return
    country_df = df
    country_name = country_name
    for n in range(10, 55, 5):
            build_barh(country_df, country_name, title=f'Top {n}', limit=n)
    build_choropleth(country_name)
    try:
        build_worldcloud(country_df, country_name)
    except ValueError:
        print(f'No hashtags survived cut-off. {country_name} returned empty.')

    # except:
    #     print(f'top level exception reached for country: {country_name} - is this country present in the .shp?')
    #     return

def build_choropleth(country_name):
    country_name = country_name.strip()
    plt.clf()
    gdf =  gpd.GeoDataFrame(gpd.read_file('shp/World_Countries__Generalized_.shp'))
    if 'Antarctica' in gdf.COUNTRY.values:  # bin Antarctica
        gdf = gdf[gdf.COUNTRY != 'Antarctica']
    if gdf.empty:
        print(f'The gdf object is empty, cannot plot with {country_name}.')
        return
    ax = gdf.plot(column='COUNTRY', cmap='cividis', figsize=(15, 10))
    ax.set_axis_off()
    ax.set_aspect(1.2)
    plt.subplots_adjust(left=0, right=1, bottom=0, top=1)# Turn off the tick marks and tick labels
    ax.set_yticks([])
    ax.set_xticks([])
    minx, maxx = gdf.total_bounds[0], gdf.total_bounds[2]
    miny, maxy = gdf.total_bounds[1], gdf.total_bounds[3]
    ax.set_xlim(minx, maxx)
    ax.set_ylim(miny, maxy)
    plt.tight_layout()
    plt.savefig(f'output/{country_name}/{country_name}.png')
    plt.close()

def build_worldcloud(df, country_name):
    plt.clf()
    mask = np.array(Image.open(f'output/{country_name}/{country_name}.png'))
    wc = WordCloud(background_color='white', mask=mask, contour_width=1, contour_color='black')
    wc.generate_from_frequencies(frequencies=df[['hashtags','count']].set_index('hashtags').to_dict()['count'])
    wc.to_file(f'output/{country_name}/{country_name} - wordCloud.png') # write to file
    plt.figure(figsize=(15, 10))
    plt.imshow(wc, interpolation='bilinear')
    plt.axis('off')
    plt.close()


def build_barh(df, country_name, title, limit: int):
    plt.clf()
    df = df.head(limit)
    title = f'{country_name} - {title}'
    x = df['count']
    y = df['hashtags']
    fig, ax = plt.subplots(figsize=(10, 6))
    cmap = cm.get_cmap('viridis')
    colors = cmap(x/x.max()*0.5)
    ax.barh(y=y, width=x, height=0.9, color=colors)
    ax.invert_yaxis()   
    ax.set_title(title)
    ax.set_xlabel('Frequency')
    ax.set_ylabel('Hashtags')
    ax.spines['right'].set_visible(False)
    ax.spines['top'].set_visible(False)
    plt.tight_layout()
    plt.savefig(f'output/{country_name}/{title}.png')
    plt.close()



In [15]:
try:
    os.mkdir('output')
except:
    print('file already exists, skipping mkdir...', flush=True)
finally:
    output_country()



<Figure size 640x480 with 0 Axes>