In [1]:
import csv
from collections import defaultdict


In [2]:
def mapper(row):
    # Extract relevant information from the row
    tweet = row['tweet']
    likes = int(row['likes'])

    # Emit intermediate key-value pairs
    yield ('max_likes', likes)


In [3]:
def reducer(key, values):
    # Find the maximum number of likes
    max_likes = max(values)

    # Emit the final result
    yield (key, max_likes)


In [4]:
csv_file = 'JoeBidenTweets.csv'  # Path to your CSV file

# Perform MapReduce
likes_mapper = defaultdict(list)
with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for key, value in mapper(row):
            likes_mapper[key].append(value)

# Collect the intermediate results
likes_reducer = {}
for key, values in likes_mapper.items():
    for result in reducer(key, values):
        likes_reducer[result[0]] = result[1]

# Output the final results
for key, value in likes_reducer.items():
    print(f'{key}: {value}')


max_likes: 1890946


# Getting minumum likes

In [5]:
def mapper(row):
    # Extract relevant information from the row
    tweet = row['tweet']
    likes = int(row['likes'])

    # Emit intermediate key-value pairs
    yield ('min_likes', likes)

def reducer(key, values):
    # Find the maximum number of likes
    min_likes = min(values)

    # Emit the final result
    yield (key, min_likes)
    
csv_file = 'JoeBidenTweets.csv'  # Path to your CSV file

# Perform MapReduce
likes_mapper = defaultdict(list)
with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for key, value in mapper(row):
            likes_mapper[key].append(value)

# Collect the intermediate results
likes_reducer = {}
for key, values in likes_mapper.items():
    for result in reducer(key, values):
        likes_reducer[result[0]] = result[1]

# Output the final results
for key, value in likes_reducer.items():
    print(f'{key}: {value}')



min_likes: 2


# Getting word count of the tweet 

In [6]:
def mapper(row):
    # Extract relevant information from the row
    tweet = row['tweet']
    
    # Split the tweet into words
    words = tweet.split()

    # Emit intermediate key-value pairs
    for word in words:
        yield (word.lower(), 1)


In [7]:
def reducer(key, values):
    # Sum the counts of the words
    word_count = sum(values)

    # Emit the final result
    yield (key, word_count)


In [8]:
csv_file = 'JoeBidenTweets.csv'  # Path to your CSV file

# Perform MapReduce
word_counts_mapper = defaultdict(list)
with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for key, value in mapper(row):
            word_counts_mapper[key].append(value)

# Collect the intermediate results
word_counts_reducer = {}
for key, values in word_counts_mapper.items():
    for result in reducer(key, values):
        word_counts_reducer[result[0]] = result[1]

# Output the final results
for key, value in word_counts_reducer.items():
    print(f'{key}: {value}')


tune: 180
in: 2996
11:30: 1
et: 35
tomorrow: 39
for: 2079
a: 3300
live: 92
webcast: 1
of: 3418
families: 106
usa: 1
presidential: 26
forum: 9
on: 1541
health: 383
care:: 1
http://presidentialforums.health08.org/: 1
iowans,: 2
there's: 57
good: 96
chance: 52
biden: 443
near: 14
you: 1063
today: 204
cool: 1
14: 3
f: 1
day:: 23
http://blog.joebiden.com/?p=1625: 1
we're: 185
excited: 19
to: 6947
announce: 5
that: 1263
@joebiden: 7
is: 1896
being: 101
rebooted: 1
the: 7894
2012: 2
campaign: 178
season: 6
give: 153
news: 23
vice: 88
president: 985
trail.: 3
staff: 13
will: 968
run: 19
this: 1713
account: 11
keep: 151
up: 454
date: 3
what: 397
vp's: 5
to,: 5
but: 570
you'll: 18
see: 121
occasional: 1
tweets: 6
from: 535
joe: 135
himself,: 1
too.: 15
morning:: 4
vp: 251
speak: 38
exeter,: 2
nh: 6
4/12: 1
tax: 116
fairness: 9
and: 5257
president’s: 24
support: 155
#buffettrule.: 2
4/12,: 1
his: 545
take: 379
why: 140
millionaires: 7
shouldn’t: 19
pay: 109
lower: 26
rate: 16
than: 331
middle: 24

unlike: 11
https://t.co/obne1op7bg: 1
sumter,: 2
live!: 7
https://t.co/rd1eksjxuo: 1
apply: 2
fellowship: 1
monday,: 2
july: 8
ins-and-outs: 1
digital: 1
eight-week: 1
program.: 6
apply:: 1
https://t.co/webnznq3dd: 1
https://t.co/hqlnmjiura: 1
williamsburg: 1
county,: 4
impacting: 7
rural: 19
access,: 4
https://t.co/ms61movtru: 1
https://t.co/v9asyisihf: 1
back-to-back: 1
titles!: 1
https://t.co/x22qgludfb: 1
cameron: 1
boyce: 1
year’s: 1
awards: 1
@itsonus.: 1
light: 13
gone: 7
https://t.co/4bpbqwqnro: 1
altered.: 1
https://t.co/zgult8y47v: 1
lifetime: 8
figure,: 1
selected: 1
judgment: 6
record,: 3
else’s.: 1
https://t.co/xozjhnl6xx: 1
eric,: 1
.@ericswalwell: 1
commend: 3
attention: 2
intelligence: 5
committee: 2
influential: 1
hall: 34
charleston,: 1
thoughtful: 2
questions: 9
epidemic,: 7
https://t.co/dbabofnq4v: 1
courts: 5
sabotage: 7
obamacare: 78
https://t.co/7seax4hzrd: 1
protects: 9
100: 37
aca.: 2
right—not: 1
aca: 15
heinous,: 1
despicable: 2
imaginable.: 1
inexcusably: 1


troops’: 1
iraq,: 1
responsibly: 1
resort.: 1
https://t.co/xxoyjc2bob: 1
commitments: 2
dealing: 2
lifesaving: 7
measures.: 3
polls,: 3
stretch:: 5
https://t.co/galb0s4lq6: 1
patriot.: 5
https://t.co/zou4sdktee: 1
https://t.co/mkh1hkqzjr: 1
https://t.co/op9bad4rye: 1
compare.: 1
https://t.co/132tb7mhaq: 1
https://t.co/nksx4dbbh3: 1
https://t.co/7uq8qftn0v: 1
https://t.co/8qv24014y7: 1
#proact: 1
https://t.co/449ezamu9s: 1
https://t.co/66ucpvp0jc: 1
https://t.co/blsiqfdehy: 1
https://t.co/htbapi8nkq: 1
damned: 1
siding: 1
https://t.co/q4ttyvjdvt: 1
closing: 5
argument: 1
possible:: 1
https://t.co/hwl9p6lqq4: 1
story:: 2
https://t.co/2udsxfscfy: 1
https://t.co/37biab9xtk: 2
https://t.co/vkxpwqmehg: 1
https://t.co/bwni6knzuu: 1
https://t.co/m2xhjjumst: 3
https://t.co/7bqdyrqfsk: 1
https://t.co/hfywsaw5cn: 1
https://t.co/yncbunklmh: 1
@ibew.: 1
https://t.co/tdbagzvdhs: 1
veiled: 1
lately,: 2
something:: 3
https://t.co/a5r0hmxomx: 1
mandela: 1
resumed: 1
life's: 1
apartheid.: 1
reconciliati

obstacles,: 1
wisdom,: 1
https://t.co/qging24mli: 1
@luisanaperezf,: 1
luisana.: 1
https://t.co/k842egj5tk: 1
https://t.co/utcldfd8jm: 1
darkness.: 4
wipes: 1
https://t.co/h4qnoqpmdp: 1
https://t.co/s4wiusflrl: 1
these,: 1
https://t.co/zrssx1gb2o: 1
light,: 2
belarusians: 1
chains: 1
https://t.co/i1wvnu0wkz: 1
angrier,: 1
hopeful,: 1
divided.: 1
path,: 2
reborn,: 1
light.: 3
laura: 1
https://t.co/nxl5j4iewe: 1
ruined: 1
doesn't.: 2
jacob: 4
blake: 4
car.: 1
https://t.co/9x7l25nq8d: 1
@jeffflake.: 1
https://t.co/3cane1yf0j: 1
other,: 1
https://t.co/9dpuqva3cq: 1
https://t.co/w8mrzqvinz: 1
.@realdonaldtrump: 2
https://t.co/amhxatxnpm: 1
it...: 2
https://t.co/vdycnwinbk: 1
focused,: 1
https://t.co/4oqntuyug7: 1
https://t.co/0xzoqjdflp: 1
https://t.co/ikyf4nu9dc: 1
https://t.co/vbz3jrqu84: 1
https://t.co/3h4twqsq4i: 1
https://t.co/sdfa92hfcb: 1
come,”: 1
https://t.co/sokkf9kqyo: 1
https://t.co/ml7nj94nks: 1
unseen.: 1
maine's: 1
lobstermen: 1
livelihood: 1
fishing: 2
https://t.co/juf1o2p3e

# Analyzing the sentiment of the tweet 

In [9]:
from nltk.sentiment import SentimentIntensityAnalyzer
sia = SentimentIntensityAnalyzer()


In [10]:
def mapper(row):
    # Extract relevant information from the row
    tweet = row['tweet']

    # Perform sentiment analysis
    sentiment_score = sia.polarity_scores(tweet)['compound']

    # Emit intermediate key-value pairs
    yield ('sentiment', sentiment_score)


In [11]:
def reducer(key, values):
    # Calculate the average sentiment score
    sentiment_sum = sum(values)
    sentiment_count = len(values)
    average_sentiment = sentiment_sum / sentiment_count

    # Emit the final result
    yield (key, average_sentiment)


In [12]:
csv_file = 'JoeBidenTweets.csv'  # Path to your CSV file

# Perform MapReduce
sentiment_mapper = defaultdict(list)
with open(csv_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for key, value in mapper(row):
            sentiment_mapper[key].append(value)

# Collect the intermediate results
sentiment_reducer = {}
for key, values in sentiment_mapper.items():
    for result in reducer(key, values):
        sentiment_reducer[result[0]] = result[1]

# Output the final results
for key, value in sentiment_reducer.items():
    print(f'{key}: {value}')


sentiment: 0.1730743733509233


# Analyzing user activity

In [13]:
from collections import defaultdict

# Define the Map function
def mapper(row):
    # Extract relevant information from the row
    user_id = row['id']
    timestamp = row['timestamp']
    # Emit intermediate key-value pairs
    yield (user_id, timestamp)

# Define the Reduce function
def reducer(key, values):
    # Perform analysis on the activity information for each user
    activity_count = len(values)
    # Emit the final results
    yield (key, activity_count) 

# Read and process the input dataset
input_file = 'JoeBidenTweets.csv'  # Path to your input dataset
user_activity_mapper = defaultdict(list)

# Perform Map step
with open(input_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for key, value in mapper(row):
            user_activity_mapper[key].append(value)

# Perform Reduce step
user_activity_reducer = {}
for key, values in user_activity_mapper.items():
    for result in reducer(key, values):
        user_activity_reducer[result[0]] = result[1]

# Output the final results
for key, value in user_activity_reducer.items():
    print(f'{key}: {value}')


361388562: 1
543984392: 1
189287227321356289: 1
189287350034104320: 1
189339650610036736: 1
189343279140184065: 1
189383473717460992: 1
189456416917032960: 1
189712253187145728: 1
189742771387637760: 1
189773314288402432: 1
189813040856825856: 1
190129116622761984: 1
190146175008112640: 1
190172245975502848: 1
190198707449372673: 1
190221525117448192: 1
190454419035398145: 1
190468063236337664: 1
190468245638221824: 1
190508166717517825: 1
190594251615240192: 1
190605642581549056: 1
190619904238563330: 1
190826443322818561: 1
190872265120940034: 1
190896173958168577: 1
190922422483042304: 1
191185984912752641: 1
191922916563222528: 1
191951432654663680: 1
192295285098156032: 1
192372300979519488: 1
192450734799847424: 1
192658120269500416: 1
192679673308782593: 1
192714383175069697: 1
192723684140654593: 1
192753468186959875: 1
193047199297445888: 1
193081247776505856: 1
193386289544970240: 1
193733508651425793: 1
194450021532442628: 1
194450291834359809: 1
194464639545909250: 1
194529

1236042621677690880: 1
1236075337164812288: 1
1236324479233449984: 1
1236343856699867137: 1
1236360367430717440: 1
1236366505979154432: 1
1236381353811120128: 1
1236408784492298240: 1
1236433064038780928: 1
1236449049865224194: 1
1236465665864880128: 1
1236623516226785283: 1
1236702218092515333: 1
1236727132044304387: 1
1236760575528464384: 1
1236769913995313152: 1
1236792814823849985: 1
1236805146312466432: 1
1236823760210141184: 1
1236834273526779904: 1
1237049339241005056: 1
1237073537342603264: 1
1237088318602579970: 1
1237100844559904769: 1
1237109652568944644: 1
1237133991855169536: 1
1237151428201897985: 1
1237162500757409794: 1
1237177600314753025: 1
1237373939837526016: 1
1237388993869774848: 1
1237410887650041856: 1
1237436556685627392: 1
1237470782260830208: 1
1237503497664258048: 1
1237529014329159681: 1
1237538649207492609: 1
1237545869529845760: 1
1237571284042534913: 1
1237576118099574788: 1
1237585402816425988: 1
1237595289491517440: 1
1237739049735073797: 1
12377627069

# Top N likes from the dataset

In [14]:
from heapq import nlargest

# Define the Map function
def mapper(row):
    # Extract relevant information from the row
    tweet_id = row['id']
    likes = int(row['likes'])
    # Emit intermediate key-value pair
    yield ('key', (tweet_id, likes))

# Define the Reduce function
def reducer(key, values):
    # Sort the values based on likes in descending order
    top_n = nlargest(N, values, key=lambda x: x[1])
    # Emit the top N tuples with highest likes
    for tweet in top_n:
        yield tweet

# Read and process the input dataset
input_file = 'JoeBidenTweets.csv'  # Path to your input dataset
top_n = 5  # Specify the value of N

# Perform Map step
with open(input_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for result in mapper(row):
            print(result)  # Emit the intermediate key-value pairs

# Perform Reduce step
# Assuming you have collected the emitted intermediate key-value pairs into a list called 'intermediate_pairs'
grouped_pairs = {'key': []}  # Group all records under the same key
grouped_pairs['key'].extend(grouped_pairs)

# Find the top N likes
top_n_likes = []
for key, values in grouped_pairs.items():
    for result in reducer(key, values):
        top_n_likes.append(result)

# Output the top N likes
for tweet in top_n_likes:
    print(f"Tweet ID: {tweet[0]}, Likes: {tweet[1]}")


('key', ('361388562', 11))
('key', ('543984392', 22))
('key', ('189287227321356289', 20))
('key', ('189287350034104320', 51))
('key', ('189339650610036736', 5))
('key', ('189343279140184065', 6))
('key', ('189383473717460992', 51))
('key', ('189456416917032960', 12))
('key', ('189712253187145728', 137))
('key', ('189742771387637760', 5))
('key', ('189773314288402432', 7))
('key', ('189813040856825856', 3))
('key', ('190129116622761984', 11))
('key', ('190146175008112640', 4))
('key', ('190172245975502848', 208))
('key', ('190198707449372673', 3))
('key', ('190221525117448192', 7))
('key', ('190454419035398145', 21))
('key', ('190468063236337664', 2))
('key', ('190468245638221824', 2))
('key', ('190508166717517825', 4))
('key', ('190594251615240192', 8))
('key', ('190605642581549056', 7))
('key', ('190619904238563330', 62))
('key', ('190826443322818561', 4))
('key', ('190872265120940034', 7))
('key', ('190896173958168577', 14))
('key', ('190922422483042304', 5))
('key', ('19118598491275

NameError: name 'N' is not defined

# Retweet analysis

In [15]:
# Define the Map function
def mapper(row):
    # Extract relevant information from the row
    tweet_id = row['id']
    retweet_count = int(row['retweets'])
    # Emit intermediate key-value pair
    yield ('key', (tweet_id, retweet_count))

# Define the Reduce function
def reducer(key, values):
    # Perform retweet analysis, such as finding the maximum retweet count
    max_retweet_count = max(values, key=lambda x: x[1])
    # Emit the analysis result
    yield max_retweet_count

# Read and process the input dataset
input_file = 'JoeBidenTweets.csv'  # Path to your input dataset

# Perform Map step
with open(input_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for result in mapper(row):
            print(result)  # Emit the intermediate key-value pairs

# Perform Reduce step
# Assuming you have collected the emitted intermediate key-value pairs into a list called 'intermediate_pairs'
grouped_pairs = {'key': []}  # Group all records under the same key
grouped_pairs['key'].extend(intermediate_pairs)

# Perform retweet analysis
retweet_analysis_result = []
for key, values in grouped_pairs.items():
    for result in reducer(key, values):
        retweet_analysis_result.append(result)

# Output the retweet analysis result
for tweet in retweet_analysis_result:
    print(f"Tweet ID: {tweet[0]}, Retweet Count: {tweet[1]}")


('key', ('361388562', 5))
('key', ('543984392', 16))
('key', ('189287227321356289', 82))
('key', ('189287350034104320', 76))
('key', ('189339650610036736', 54))
('key', ('189343279140184065', 52))
('key', ('189383473717460992', 471))
('key', ('189456416917032960', 29))
('key', ('189712253187145728', 217))
('key', ('189742771387637760', 21))
('key', ('189773314288402432', 44))
('key', ('189813040856825856', 26))
('key', ('190129116622761984', 29))
('key', ('190146175008112640', 11))
('key', ('190172245975502848', 443))
('key', ('190198707449372673', 8))
('key', ('190221525117448192', 64))
('key', ('190454419035398145', 89))
('key', ('190468063236337664', 19))
('key', ('190468245638221824', 12))
('key', ('190508166717517825', 23))
('key', ('190594251615240192', 26))
('key', ('190605642581549056', 27))
('key', ('190619904238563330', 268))
('key', ('190826443322818561', 15))
('key', ('190872265120940034', 17))
('key', ('190896173958168577', 89))
('key', ('190922422483042304', 24))
('key', 

('key', ('1207355897191776257', 122))
('key', ('1207371953348079616', 157))
('key', ('1207408947054776320', 216))
('key', ('1207425053312180224', 1667))
('key', ('1207439146031206411', 460))
('key', ('1207458021539926017', 150))
('key', ('1207464563781832706', 214))
('key', ('1207479041256755201', 10510))
('key', ('1207690082553139201', 2040))
('key', ('1207718244582342656', 2878))
('key', ('1207785176169877510', 1040))
('key', ('1207808580474327041', 975))
('key', ('1207824644163670016', 147))
('key', ('1207825781965164550', 155))
('key', ('1207829446075539457', 315))
('key', ('1207833664530370560', 636))
('key', ('1207837874726559747', 176))
('key', ('1207838763826716675', 475))
('key', ('1207839405504974853', 272))
('key', ('1207840984928849920', 1810))
('key', ('1207842283455991809', 266))
('key', ('1207843729454829573', 285))
('key', ('1207846568767954944', 397))
('key', ('1207859230675537920', 418))
('key', ('1207862146534649856', 276))
('key', ('1207865574795161600', 249))
('key

('key', ('1322865972819251200', 3654))
('key', ('1322871257902145536', 4211))
('key', ('1322876039144636417', 1143))
('key', ('1322881072363917312', 2255))
('key', ('1322886357535150081', 270))


NameError: name 'intermediate_pairs' is not defined

# Hashtag analysis

In [19]:
import re
# Define the Map function
def mapper(row):
    # Extract relevant information from the row
    tweet_id = row['id']
    likes = int(row['likes'])
    tweet_text = row['tweet']
    
    # Extract hashtags using regular expressions
    hashtags = re.findall(r'#(\w+)', tweet_text)
    
    # Emit intermediate key-value pairs for each hashtag
    for hashtag in hashtags:
        yield (hashtag, (tweet_id, likes))


In [20]:
# Define the Reduce function
def reducer(hashtag, values):
    total_likes = sum(likes for _, likes in values)
    
    # Emit the hashtag and its total likes
    yield (hashtag, total_likes)


In [21]:
from heapq import nlargest

input_file = 'JoeBidenTweets.csv'  # Path to your input dataset
top_n = 5  # Specify the value of N

# Perform Map step
grouped_pairs = {}
with open(input_file, 'r') as file:
    reader = csv.DictReader(file)
    for row in reader:
        for result in mapper(row):
            hashtag = result[0]
            tweet_info = result[1]
            
            # Group intermediate pairs by the hashtag
            if hashtag not in grouped_pairs:
                grouped_pairs[hashtag] = []
            grouped_pairs[hashtag].append(tweet_info)

# Perform Reduce step
top_n_hashtags = nlargest(top_n, grouped_pairs.items(), key=lambda x: sum(likes for _, likes in x[1]))

# Output the top N hashtags
for hashtag, likes in top_n_hashtags:
    print(f"Hashtag: {hashtag}, Total Likes: {sum(likes for _, likes in likes)}")


Hashtag: DemConvention, Total Likes: 1736647
Hashtag: DemDebate, Total Likes: 840162
Hashtag: charlottesville, Total Likes: 622953
Hashtag: BidenTownHall, Total Likes: 473203
Hashtag: NationalComingOutDay, Total Likes: 323095


2023-05-25 14:56:12,021 WARN spark.HeartbeatReceiver: Removing executor driver with no recent heartbeats: 862402 ms exceeds timeout 120000 ms
2023-05-25 14:56:12,649 WARN spark.SparkContext: Killing executors is not supported by current scheduler.
