In [1]:
# Import PySpark
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


import re as re

# NLTK Library
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Numpy
import numpy as np

# Data Pre-processing functions of pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import size
from pyspark.sql.functions import udf, col
from pyspark.ml.feature import RegexTokenizer,StopWordsRemover

# String
import string
from pyspark.sql import Row
from pyspark.sql.functions import concat_ws
from pyspark.sql.types import StringType


# Operating System
import os
import sys

In [2]:
# Download item of NLTK
nltk.download('omw-1.4')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\Vishv\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Vishv\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Vishv\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [3]:
# SparkSession Created
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark =SparkSession.builder.master("local[1]").appName('SparkProject').getOrCreate() 

In [4]:
# Read file from HDFS and storing to dataframe
# df = spark.read.option("multiline", "true").json("hdfs://localhost:9000/bigdata/data-analytics-tweet-wc-300mb-fresh.json")
# df = spark.read.option("multiline", "true").json("hdfs://localhost:9000/finalbigdata/finaldata/data-analytics-tweet-wc.json")
data = spark.read.option("multiline", "true").json("hdfs://localhost:9000/finalbigdata/finaldata/final-tweet-wc.json")

In [5]:
# Drop duplicate tweets in the data and filter data
data = data.dropDuplicates(['tweetid'])
#data = data.na.drop()
data = data.filter("content != ''")

In [6]:
# Taking only english language tweets
data = data[data.lang == "en"]

In [7]:
data.head(10)

[Row(content='#NewYear2022 with two major  football events; \n\n#AFCON2022 \n&amp;\n#WorldCup2022 \n\nOur best wishes to our national team ! https://t.co/FYlQNYekxa', date='2022-01-01T00:01:54+00:00', lang='en', likeCount=64, replyCount=2, retweetCount=3, tweetid=1477067523237593098, url='https://twitter.com/arryadiatv/status/1477067523237593098', user_location='Casablanca, Maroc'),
 Row(content='Drop a 💛 if you’re ready to see Brett score some bangers at The Lab! \n.\n#somosunidos #newmexico #soccer #newsigning #futbol #uslchampionship https://t.co/xQwXwpqi4A', date='2022-01-01T00:03:28+00:00', lang='en', likeCount=73, replyCount=12, retweetCount=2, tweetid=1477067916193398784, url='https://twitter.com/NewMexicoUTD/status/1477067916193398784', user_location='New Mexico, USA'),
 Row(content='the ball is just a ball, the enemy is just a human, and you, you are not a ordinary one, you are a SOCCER PLAYER!!!! #soccer', date='2022-01-01T00:04:52+00:00', lang='en', likeCount=0, replyCount=0

In [8]:
data.count()

825801

In [9]:
# data = data.drop('user_displayname','user_followersCount','user_friendsCount','user_id','user_url','user_username')

In [10]:
data.printSchema()

root
 |-- content: string (nullable = true)
 |-- date: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- likeCount: long (nullable = true)
 |-- replyCount: long (nullable = true)
 |-- retweetCount: long (nullable = true)
 |-- tweetid: long (nullable = true)
 |-- url: string (nullable = true)
 |-- user_location: string (nullable = true)



In [11]:
# Function to remove emoji from the content

def emoji_removing(string):
    emoji = re.compile("["u"\U0001F600-\U0001F64F"u"\U0001F300-\U0001F5FF"u"\U0001F680-\U0001F6FF"u"\U0001F1E0-\U0001F1FF"u"\U00002702-\U000027B0"u"\U000024C2-\U0001F251""]+", flags=re.UNICODE)
    return emoji.sub(r'', string)


In [12]:
# Function to remove number from the content

def number_removing(tweets):
    tweets = re.sub('([0-9]+)', '', tweets)
    return tweets

In [13]:
# Function to remove user with @ from the content

def user_removing(tweets):
    tweets = re.sub('(RT\s@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweets)
    tweets = re.sub('(@[A-Za-z]+[A-Za-z0-9-_]+)', '', tweets)
    return tweets

In [14]:
# Removing emojis, user and number from the content

emoji_removing=udf(emoji_removing)
user_removing=udf(user_removing)
number_removing=udf(number_removing)

In [15]:
data=data.withColumn('content', emoji_removing(data['content']))
data=data.withColumn('content', user_removing(data['content']))
data=data.withColumn('content', number_removing(data['content']))

In [16]:
# Removing unncessary charecters from the tweets

data = data.withColumn('content', F.regexp_replace('content', r'http\S+', ''))
data = data.withColumn('content', F.regexp_replace('content', '@\w+', ''))
data = data.withColumn('content', F.regexp_replace('content', '#', ''))
data = data.withColumn('content', F.regexp_replace('content', '[^\sa-zA-Z0-9]', ''))
data = data.withColumn("content",F.lower('content'))
data = data.withColumn("content",F.rtrim('content'))

In [17]:
data.head(10)

[Row(content='newyear with two major  football events \n\nafcon \namp\nworldcup \n\nour best wishes to our national team', date='2022-01-01T00:01:54+00:00', lang='en', likeCount=64, replyCount=2, retweetCount=3, tweetid=1477067523237593098, url='https://twitter.com/arryadiatv/status/1477067523237593098', user_location='Casablanca, Maroc'),
 Row(content='drop a  if youre ready to see brett score some bangers at the lab \n\nsomosunidos newmexico soccer newsigning futbol uslchampionship', date='2022-01-01T00:03:28+00:00', lang='en', likeCount=73, replyCount=12, retweetCount=2, tweetid=1477067916193398784, url='https://twitter.com/NewMexicoUTD/status/1477067916193398784', user_location='New Mexico, USA'),
 Row(content='the ball is just a ball the enemy is just a human and you you are not a ordinary one you are a soccer player soccer', date='2022-01-01T00:04:52+00:00', lang='en', likeCount=0, replyCount=0, retweetCount=0, tweetid=1477068270620798978, url='https://twitter.com/soccerlover0414

In [18]:
# Tokenization

tokenizer = RegexTokenizer().setPattern("[\\W_]+").setMinTokenLength(3).setInputCol("content").setOutputCol("tokens")
data = tokenizer.transform(data)

In [19]:
# Stopwords Removing

stopwordList = stopwords.words('english')
remover = StopWordsRemover(stopWords=stopwordList)
remover.setInputCol("tokens")
remover.setOutputCol("filtered")

data = remover.transform(data);

In [20]:
# Lemmatization

lemmatizer = WordNetLemmatizer()
def lemmatization(rows):
    rows = [lemmatizer.lemmatize(word,'v') for word in rows]
    return rows

lemmatization = udf(lemmatization)
data=data.withColumn('tokens', lemmatization(data['filtered']))

In [21]:
data.head()

Row(content='newyear with two major  football events \n\nafcon \namp\nworldcup \n\nour best wishes to our national team', date='2022-01-01T00:01:54+00:00', lang='en', likeCount=64, replyCount=2, retweetCount=3, tweetid=1477067523237593098, url='https://twitter.com/arryadiatv/status/1477067523237593098', user_location='Casablanca, Maroc', tokens='[newyear, two, major, football, events, afcon, amp, worldcup, best, wish, national, team]', filtered=['newyear', 'two', 'major', 'football', 'events', 'afcon', 'amp', 'worldcup', 'best', 'wishes', 'national', 'team'])

In [22]:
# Filtered the data after preprocessing the data

data = data.withColumn("filtered", concat_ws(" ", "filtered"))

In [23]:
# Drop tokens from the dataframe
# data=data.drop("tokens","filtered")
data=data.drop("tokens")

In [24]:
data.head()

Row(content='newyear with two major  football events \n\nafcon \namp\nworldcup \n\nour best wishes to our national team', date='2022-01-01T00:01:54+00:00', lang='en', likeCount=64, replyCount=2, retweetCount=3, tweetid=1477067523237593098, url='https://twitter.com/arryadiatv/status/1477067523237593098', user_location='Casablanca, Maroc', filtered='newyear two major football events afcon amp worldcup best wishes national team')

In [25]:
# Downlad the data as json
data.write.json('final-tweet-wc.json')

In [26]:
!pip install spacy



In [27]:
# Import geolocation data
import spacy
import pandas as pd

In [28]:
locate = spacy.load("en_core_web_sm")

In [59]:
# Storing location

z=[]
location = data.rdd.map(lambda x: x[8]).collect()

In [65]:
# Convert the user location to their respective country

for names in location:
    if names:
        loc = locate(names)
        for country in loc.ents:
            if country.label_ in ['GPE']:
                z.append(country.text)

In [70]:
df = pd.DataFrame(z,columns =['Country'])

In [71]:
df.head()

Unnamed: 0,Country
0,Casablanca
1,New Mexico
2,USA
3,England
4,England


In [77]:
count_df = df['Country'].value_counts()

In [85]:
count_df = pd.DataFrame(count_df)

In [86]:
count_df

Unnamed: 0,Country
Qatar,42193
England,29114
India,26749
Doha,21082
London,16746
...,...
stockholm,1
Malpura,1
Costa Del Sol,1
Magill,1


In [90]:
count_df.to_csv('country.csv')