[PySpark](https://spark.apache.org/docs/3.3.2/api/python/getting_started/index.html)

[https://spark.apache.org/docs/3.3.2/api/python/getting_started/index.html]

https://spark.apache.org/examples.html

# Exporting Data from MongoDB to a CSV File


As a Sports Analyst, I am very interested in reporting on the countries with the most popularity in Twitter. So a good way to approach this problem would be to find which countries were mentioned the most in the tweets in our dataset and to analyze what words are being used the most in these tweets.

In [1]:
!ls

 country-list.csv		     SoccerTweetAnalysis.ipynb
'SoccerTweetAnalysis (copy).ipynb'


In [2]:
!pwd

/home/hadoop/Downloads/big-data-3/final-project


In [3]:
import pandas as pd

In [4]:
footy_tweets = pd.read_csv('/home/hadoop/Downloads/big-data-3/mongodb/dump/football_tweets.csv', 
                           on_bad_lines='skip',
                           header = None
                          )
footy_tweets.sample(3)

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14
6491,"{""_id"":{""$oid"":""57967032c38159226b4c88cb""}","user_name:""TheDelgadoWay""",retweet_count:0,tweet_followers_count:1570,"source:""<a href=\""https://mobile.twitter.com\""...",coordinates:null,tweet_mentioned_count:1,"tweet_ID:""757667207870644224""","tweet_text:""RT @OldDaysFootball: Throwback to ...","user:{""CreatedAt"":{""$date"":""2010-07-11T05:51:4...",FavouritesCount:561,FollowersCount:1570,FriendsCount:979,UserId:165309726,"Location:""|Bernabeu|Mars|Compton|Nigeria""}}"
829,"{""_id"":{""$oid"":""5796562ac381590a1c83c176""}","user_name:""GaryBooth1""",retweet_count:0,tweet_followers_count:334,"source:""<a href=\""http://twitter.com/download/...",coordinates:null,tweet_mentioned_count:1,"tweet_ID:""757639259067146245""","tweet_text:""RT @TSBible: Throwback to FIFA 94 ...","user:{""CreatedAt"":{""$date"":""2011-08-30T17:23:1...",FavouritesCount:686,FollowersCount:334,FriendsCount:657,UserId:364978620,"Location:""Glasgow""}}"
3984,"{""_id"":{""$oid"":""579667f8c381591baabef3a3""}","user_name:""PlayoffFootball""",retweet_count:0,tweet_followers_count:18190,"source:""<a href=\""http://twitter.com/download/...",coordinates:null,tweet_mentioned_count:1,"tweet_ID:""757658377019424770""","tweet_text:""RT @ESPNU: In 40 days we'll all be...","user:{""CreatedAt"":{""$date"":""2013-11-21T23:21:3...",FavouritesCount:8735,FollowersCount:18190,FriendsCount:18882,UserId:-2086991098,"Location:""#CFP""}}"


In [5]:
footy_tweets.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6944 entries, 0 to 6943
Data columns (total 15 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   0       6944 non-null   object
 1   1       6944 non-null   object
 2   2       6944 non-null   object
 3   3       6944 non-null   object
 4   4       6944 non-null   object
 5   5       6944 non-null   object
 6   6       6944 non-null   object
 7   7       6944 non-null   object
 8   8       6944 non-null   object
 9   9       6944 non-null   object
 10  10      6944 non-null   object
 11  11      6944 non-null   object
 12  12      6944 non-null   object
 13  13      6944 non-null   object
 14  14      6944 non-null   object
dtypes: object(15)
memory usage: 813.9+ KB


In [6]:
country_list = pd.read_csv('country-list.csv')
country_list.sample(3)

Unnamed: 0,Afghanistan,AFG
13,Bahamas,BAH
152,Republic of Ireland,IRL
192,Tunisia,TUN


In [8]:
import pyspark
sc = pyspark.SparkContext()

23/04/06 00:02:41 WARN Utils: Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/04/06 00:02:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/06 00:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
# Import and create a new SQLContext 
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)



In [112]:
# Read the country CSV file into an RDD.
country_lines = sc.textFile('file:///home/hadoop/Downloads/big-data-3/final-project/country-list.csv')

In [159]:
country_lines.take(5)

['Afghanistan, AFG',
 'Albania, ALB',
 'Algeria, ALG',
 'American Samoa, ASA',
 'Andorra, AND']

In [126]:
# Convert each line into a pair of words
country = country_lines.map(lambda line : tuple(line.split(", ")))

In [127]:
country.take(5)

                                                                                

[('Afghanistan', 'AFG'),
 ('Albania', 'ALB'),
 ('Algeria', 'ALG'),
 ('American Samoa', 'ASA'),
 ('Andorra', 'AND')]

In [132]:
# Convert each pair of  country into a tuple
country_tuples =  country_lines.map(lambda word : word.split(", "))

In [133]:
country_tuples.take(5)

[['Afghanistan', 'AFG'],
 ['Albania', 'ALB'],
 ['Algeria', 'ALG'],
 ['American Samoa', 'ASA'],
 ['Andorra', 'AND']]

In [138]:
# Create the DataFrame, look at schema and contents
countryDF = sqlContext.createDataFrame(country_tuples, ["country", "code"])
# countryDF = sqlContext.createDataFrame(country, ["country", "code"])
countryDF.printSchema()
countryDF.take(3)

root
 |-- country: string (nullable = true)
 |-- code: string (nullable = true)



[Stage 78:>                                                         (0 + 1) / 1]                                                                                

[Row(country='Afghanistan', code='AFG'),
 Row(country='Albania', code='ALB'),
 Row(country='Algeria', code='ALG')]

In [139]:
countryDF.show()

+-------------------+----+
|            country|code|
+-------------------+----+
|        Afghanistan| AFG|
|            Albania| ALB|
|            Algeria| ALG|
|     American Samoa| ASA|
|            Andorra| AND|
|             Angola| ANG|
|           Anguilla| AIA|
|Antigua and Barbuda| ATG|
|          Argentina| ARG|
|            Armenia| ARM|
|              Aruba| ARU|
|          Australia| AUS|
|            Austria| AUT|
|         Azerbaijan| AZE|
|            Bahamas| BAH|
|            Bahrain| BHR|
|         Bangladesh| BAN|
|           Barbados| BRB|
|            Belarus| BLR|
|            Belgium| BEL|
+-------------------+----+
only showing top 20 rows



[Stage 79:>                                                         (0 + 1) / 1]                                                                                

In [143]:
# Read tweets CSV file into RDD of lines
footy_tw = sc.textFile('/home/hadoop/Downloads/big-data-3/mongodb/dump/football_tweets.csv')
tweets_lines = sc.textFile('/home/hadoop/Downloads/big-data-3/mongodb/dump/football_tweets.csv')

In [141]:
footy_tw.count()



11188

In [142]:
footy_tw.take(1)

[Stage 81:>                                                         (0 + 1) / 1]                                                                                

['{"_id":{"$oid":"578ffa8e7eb9513f4f55a935"},"user_name":"koteras","retweet_count":0,"tweet_followers_count":461,"source":"<a href=\\"http://twitter.com/download/iphone\\" rel=\\"nofollow\\">Twitter for iPhone</a>","coordinates":null,"tweet_mentioned_count":1,"tweet_ID":"755891629932675072","tweet_text":"RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL","user":{"CreatedAt":{"$date":"2011-12-27T09:04:01Z"},"FavouritesCount":5223,"FollowersCount":461,"FriendsCount":619,"UserId":447818090,"Location":"501"}}']

In [80]:
# Convert each tweets into a pair of words
footy_tw_lines = footy_tw.flatMap(lambda line : line.split(","))

In [82]:
footy_tw_lines.take(10)

['{"_id":{"$oid":"578ffa8e7eb9513f4f55a935"}',
 '"user_name":"koteras"',
 '"retweet_count":0',
 '"tweet_followers_count":461',
 '"source":"<a href=\\"http://twitter.com/download/iphone\\" rel=\\"nofollow\\">Twitter for iPhone</a>"',
 '"coordinates":null',
 '"tweet_mentioned_count":1',
 '"tweet_ID":"755891629932675072"',
 '"tweet_text":"RT @ochocinco: I beat them all for 10 straight hours #FIFA16KING  https://t.co/BFnV6jfkBL"',
 '"user":{"CreatedAt":{"$date":"2011-12-27T09:04:01Z"}']

In [162]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweets_words = tweets_lines.flatMap(lambda line: line.split(' '))
tweets_tuples = tweets_words.map(lambda word: (word, 1))
tweets_counts = tweets_tuples.reduceByKey(lambda a, b: (a + b))
tweets_counts.take(15)

                                                                                

[('rel=\\"nofollow\\">Twitter', 8609),
 ('beat', 51),
 ('them', 69),
 ('10', 114),
 ('hours', 58),
 ('#FIFA16KING', 27),
 ('', 2870),
 ('https://t.co/BFnV6jfkBL","user":{"CreatedAt":{"$date":"2011-12-27T09:04:01Z"},"FavouritesCount":5223,"FollowersCount":461,"FriendsCount":619,"UserId":447818090,"Location":"501"}}',
  1),
 ('@Louis_Tomlinson', 3),
 ('@socceraid', 3),
 ('when', 3905),
 ('retired', 3),
 ('of', 1375),
 ('knee', 14),
 ('.', 34)]

## Clean the data: some tweets are empty. Remove the empty tweets using filter() 

In [83]:
example1 = sc.parallelize(["'a'", 'b', "  ",'c', 'd', ""])
example1.filter(lambda x: len(x) != 0).collect()



["'a'", 'b', '  ', 'c', 'd']

In [85]:
example1.filter(lambda x: x == "'a'").collect()

["'a'"]

In [79]:
example2 = sc.parallelize(['a', 'b', 'c', 'd', ""])
example2.filter(lambda x: len(x) != 0).collect()

[Stage 39:>                                                         (0 + 2) / 2]                                                                                

['a', 'b', 'c', 'd']

In [38]:
help(footy_tw.filter)

Help on method filter in module pyspark.rdd:

filter(f: Callable[[~T], bool]) -> 'RDD[T]' method of pyspark.rdd.RDD instance
    Return a new RDD containing only the elements that satisfy a predicate.
    
    Examples
    --------
    >>> rdd = sc.parallelize([1, 2, 3, 4, 5])
    >>> rdd.filter(lambda x: x % 2 == 0).collect()
    [2, 4]



In [93]:
# Clean the data: some tweets are empty. Remove the empty tweets using filter() 
footy_tw_lines.filter(lambda x: '"tweet_text":null'  in x ).collect()

                                                                                

[]

In [86]:
'a' in 'apple'

True

In [145]:
# Perform WordCount on the cleaned tweet texts. (note: this is several lines.)
tweets_counts.count()

                                                                                

56033

In [163]:
# Create the DataFrame of tweet word counts
tweet_df = sqlContext.createDataFrame(tweets_counts, ['word', 'count'])
tweet_df.printSchema()
tweet_df.take(5)

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)



[Row(word='rel=\\"nofollow\\">Twitter', count=8609),
 Row(word='beat', count=51),
 Row(word='them', count=69),
 Row(word='10', count=114),
 Row(word='hours', count=58)]

In [148]:
# Join the country and tweet data frames (on the appropriate column)
join_df = tweet_df.join(countryDF, tweet_df.word == countryDF.country)
join_df.take(5)

                                                                                

[Row(word='Chad', count=10, country='Chad', code='CHA'),
 Row(word='Iraq', count=6, country='Iraq', code='IRQ'),
 Row(word='Germany', count=14, country='Germany', code='GER'),
 Row(word='Jordan', count=6, country='Jordan', code='JOR'),
 Row(word='France', count=39, country='France', code='FRA')]

In [151]:
join_df.show()

                                                                                

+---------+-----+---------+----+
|     word|count|  country|code|
+---------+-----+---------+----+
|     Chad|   10|     Chad| CHA|
|     Iraq|    6|     Iraq| IRQ|
|  Germany|   14|  Germany| GER|
|   Jordan|    6|   Jordan| JOR|
|   France|   39|   France| FRA|
|   Greece|    1|   Greece| GRE|
|Argentina|    1|Argentina| ARG|
|  Albania|    1|  Albania| ALB|
|    Ghana|    4|    Ghana| GHA|
|    India|    6|    India| IND|
|    Chile|    1|    Chile| CHI|
|    Italy|    1|    Italy| ITA|
|  Denmark|    1|  Denmark| DEN|
|     Iran|    1|     Iran| IRN|
|  Iceland|    2|  Iceland| ISL|
|   Israel|    2|   Israel| ISR|
|  Georgia|    4|  Georgia| GEO|
|  Jamaica|    2|  Jamaica| JAM|
|   Guinea|    8|   Guinea| GUI|
|   Canada|   12|   Canada| CAN|
+---------+-----+---------+----+
only showing top 20 rows



In [149]:
# Question 1: number of distinct countries mentioned
join_df.count()

                                                                                

49

In [150]:
# Question 2: number of countries mentioned in tweets.
from pyspark.sql.functions import sum
join_df.select(sum('count')).show()

                                                                                

+----------+
|sum(count)|
+----------+
|       384|
+----------+



In [152]:
# Table 1: top three countries and their counts.
from pyspark.sql.functions import desc
join_df.sort(desc('count')).show(3)

                                                                                

+-------+-----+-------+----+
|   word|count|country|code|
+-------+-----+-------+----+
|Nigeria|   54|Nigeria| NGA|
| France|   39| France| FRA|
|England|   31|England| ENG|
+-------+-----+-------+----+
only showing top 3 rows



In [164]:
# Table 2: counts for Wales, Iceland, and Japan.
join_df[join_df.country == 'Wales'].show()
join_df[join_df.country == 'Iceland'].show()
join_df[join_df.country == 'Japan'].show()
join_df[join_df.country == 'Kenya'].show()
join_df[join_df.country == 'Netherlands'].show()

                                                                                

+-----+-----+-------+----+
| word|count|country|code|
+-----+-----+-------+----+
|Wales|   20|  Wales| WAL|
+-----+-----+-------+----+



                                                                                

+-------+-----+-------+----+
|   word|count|country|code|
+-------+-----+-------+----+
|Iceland|    2|Iceland| ISL|
+-------+-----+-------+----+



                                                                                

+-----+-----+-------+----+
| word|count|country|code|
+-----+-----+-------+----+
|Japan|    5|  Japan| JPN|
+-----+-----+-------+----+



                                                                                

+-----+-----+-------+----+
| word|count|country|code|
+-----+-----+-------+----+
|Kenya|    3|  Kenya| KEN|
+-----+-----+-------+----+





+-----------+-----+-----------+----+
|       word|count|    country|code|
+-----------+-----+-----------+----+
|Netherlands|   13|Netherlands| NED|
+-----------+-----+-----------+----+



                                                                                

In [165]:
# Question 6: average number of times a country is mentioned in tweets.
from pyspark.sql.functions import mean
join_df.select(mean('count')).show()

                                                                                

+-----------------+
|       avg(count)|
+-----------------+
|7.836734693877551|
+-----------------+

