# EMR Hadoop Streaming Word Frequency (MapReduce) — All-in-One Notebook - SYNDRONIZATION ISSUE FIXED

This notebook runs an end-to-end **word frequency** job on **Amazon EMR** using **Hadoop Streaming**.

It will:
1. Create a small sample `logs.txt`
2. Write `mapper.py` and `reducer.py`
3. Upload input + code to S3
4. Run Hadoop Streaming (`hadoop-streaming.jar`)
5. Download and preview results

## Assumptions
- You are running this notebook **on an EMR cluster** (e.g., EMR Studio / JupyterLab attached to a cluster)
- `aws` CLI is available and your EMR role has S3 read/write permissions
- `hadoop` CLI is available
- Hadoop streaming jar exists at `/usr/lib/hadoop-mapreduce/hadoop-streaming.jar`


## 0) Configure your S3 paths
Set your bucket and prefix. Output path must not already exist.


In [4]:
import os, subprocess, textwrap
from pathlib import Path

S3_BUCKET = os.environ.get('S3_BUCKET', 'aws-logs-346690756907-us-east-1')
PREFIX = os.environ.get('MR_PREFIX', 'mapreduce/wordcount_demo')

S3_BASE = f"s3://{S3_BUCKET}/{PREFIX}".rstrip('/')
S3_INPUT = f"{S3_BASE}/input/"
S3_CODE = f"{S3_BASE}/code/"
S3_OUTPUT = f"{S3_BASE}/output/"

print('S3_INPUT :', S3_INPUT)
print('S3_CODE  :', S3_CODE)
print('S3_OUTPUT:', S3_OUTPUT)

if 'YOUR_BUCKET_NAME' in S3_BUCKET:
    print('\n⚠️  Set S3_BUCKET before running upload/job steps.')

def run(cmd, check=True):
    print('»', ' '.join(cmd))
    p = subprocess.run(cmd, text=True, capture_output=True)
    if p.stdout:
        print(p.stdout)
    if p.stderr:
        print(p.stderr)
    if check and p.returncode != 0:
        raise RuntimeError(f"Command failed with exit code {p.returncode}")
    return p


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

S3_INPUT : s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/input/
S3_CODE  : s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/code/
S3_OUTPUT: s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/

## 1) Sanity checks
Verify we can find `aws`, `hadoop`, and the Hadoop Streaming jar.


In [5]:
run(['which','aws'], check=False)
run(['which','hadoop'], check=False)

STREAMING_JAR = Path('/usr/lib/hadoop-mapreduce/hadoop-streaming.jar')
print('Streaming jar exists:', STREAMING_JAR.exists(), '-', str(STREAMING_JAR))
if not STREAMING_JAR.exists():
    print('\n⚠️ Streaming jar path not found. Try:')
    print('   sudo find /usr/lib -name "*streaming*.jar" | head')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? which aws
/usr/bin/aws

? which hadoop
/usr/bin/hadoop

Streaming jar exists: True - /usr/lib/hadoop-mapreduce/hadoop-streaming.jar

## 2) Create sample input data


In [6]:
logs = textwrap.dedent('''\
Hello world hello
MapReduce makes scaling easier
Hello EMR world
Race conditions happen without synchronization
''')
with open('logs.txt','w',encoding='utf-8') as f:
    f.write(logs)
print(logs)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Hello world hello
MapReduce makes scaling easier
Hello EMR world
Race conditions happen without synchronization

## 3) Write the mapper and reducer
- Mapper emits `(word, 1)`
- Reducer sums counts per word


In [7]:
mapper_py = textwrap.dedent('''\
    #!/usr/bin/env python3
    import sys
    import re

    WORD_RE = re.compile(r"[A-Za-z0-9']+")

    for line in sys.stdin:
        for word in WORD_RE.findall(line.lower()):
            print(f"{word}\t1")
''').lstrip()

reducer_py = textwrap.dedent('''\
    #!/usr/bin/env python3
    import sys

    current_word = None
    current_count = 0

    for line in sys.stdin:
        line = line.strip()
        if not line:
            continue
        word, count = line.split("\t", 1)
        count = int(count)

        if current_word == word:
            current_count += count
        else:
            if current_word is not None:
                print(f"{current_word}\t{current_count}")
            current_word = word
            current_count = count

    if current_word is not None:
        print(f"{current_word}\t{current_count}")
''').lstrip()

with open('mapper.py','w',encoding='utf-8') as f:
    f.write(mapper_py)
with open('reducer.py','w',encoding='utf-8') as f:
    f.write(reducer_py)

run(['chmod','+x','mapper.py','reducer.py'], check=False)
print('Wrote mapper.py and reducer.py')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? chmod +x mapper.py reducer.py
Wrote mapper.py and reducer.py

## 4) Quick local test (optional)


In [8]:
cmd = "cat logs.txt | ./mapper.py | sort | ./reducer.py | sort -k2,2nr | head"
print('»', cmd)
p = subprocess.run(cmd, shell=True, text=True, capture_output=True)
print(p.stdout)
if p.stderr:
    print(p.stderr)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? cat logs.txt | ./mapper.py | sort | ./reducer.py | sort -k2,2nr | head
hello	3
world	2
conditions	1
easier	1
emr	1
happen	1
makes	1
mapreduce	1
race	1
scaling	1

## 5) Upload input + code to S3


In [9]:
if 'YOUR_BUCKET_NAME' in S3_BUCKET:
    raise ValueError('Set S3_BUCKET to a real bucket name first (or export S3_BUCKET).')

run(['aws','s3','cp','logs.txt', S3_INPUT])
run(['aws','s3','cp','mapper.py', S3_CODE])
run(['aws','s3','cp','reducer.py', S3_CODE])


print('Uploaded input and code to S3.')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? aws s3 cp logs.txt s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/input/
Completed 112 Bytes/112 Bytes (2.2 KiB/s) with 1 file(s) remaining
upload: ./logs.txt to s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/input/logs.txt

? aws s3 cp mapper.py s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/code/
Completed 182 Bytes/182 Bytes (3.2 KiB/s) with 1 file(s) remaining
upload: ./mapper.py to s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/code/mapper.py

? aws s3 cp reducer.py s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/code/
Completed 509 Bytes/509 Bytes (13.4 KiB/s) with 1 file(s) remaining
upload: ./reducer.py to s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/code/reducer.py

Uploaded input and code to S3.

## 6) Run the Hadoop Streaming job


In [10]:
# Optional cleanup so you can re-run without changing S3_OUTPUT
run(['aws','s3','rm', S3_OUTPUT, '--recursive'], check=False)

cmd = [
    'hadoop','jar', str(STREAMING_JAR),
    '-D','mapreduce.job.name=wordcount-streaming',
    '-files','mapper.py,reducer.py',
    '-mapper','mapper.py',
    '-reducer','reducer.py',
    '-input', S3_INPUT,
    '-output', S3_OUTPUT,
]
run(cmd)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? aws s3 rm s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/ --recursive
delete: s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/_SUCCESS
delete: s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/part-00000
delete: s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/part-00002
delete: s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/part-00001

? hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapreduce.job.name=wordcount-streaming -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -input s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/input/ -output s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.4.1-amzn-4.jar] /tmp/streamjob8998808290342822771.jar tmpDir=null

2026-02-02 01:06:16,195 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-172-

## 7) Read results


In [11]:
print("=== Listing all reducer output files ===")
run(['aws','s3','ls', S3_OUTPUT])

print("\n" + "="*60)
print("DEMONSTRATING THE SYNCHRONIZATION ISSUE")
print("="*60)

# Collect data from ALL reducers
all_words = {}
for i in range(3):
    print(f"\n--- Reducer {i} Output (part-0000{i}) ---")
    
    p = subprocess.run(['aws','s3','cp', f"{S3_OUTPUT}part-0000{i}", '-'], 
                       text=True, capture_output=True, check=False)
    
    if p.returncode == 0:
        lines = p.stdout.strip().splitlines()
        print(f"First 10 words from this reducer:")
        for line in lines[:10]:
            print(line)
        print(f"Total words in this reducer: {len(lines)}")
        
        # Aggregate across all reducers
        for line in lines:
            if line:
                word, count = line.split('\t')
                all_words[word] = all_words.get(word, 0) + int(count)

# Now show the COMBINED results
print("\n" + "="*60)
print("COMBINED RESULTS (Manual Aggregation Required!)")
print("="*60)
print(f"Total unique words: {len(all_words)}")

# Top 10 words after combining all reducers
sorted_words = sorted(all_words.items(), key=lambda x: x[1], reverse=True)
print("\nTop 10 words (after aggregating all 3 reducers):")
for word, count in sorted_words[:10]:
    print(f"{word:20s} {count}")

print("\nThis is the SYNCHRONIZATION ISSUE: we had to manually combine")
print("results from all 3 reducers to get the full picture!")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

=== Listing all reducer output files ===
? aws s3 ls s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output/
2026-02-02 01:06:37          0 _SUCCESS
2026-02-02 01:06:33       9840 part-00000
2026-02-02 01:06:34       9780 part-00001
2026-02-02 01:06:36       9154 part-00002


DEMONSTRATING THE SYNCHRONIZATION ISSUE

--- Reducer 0 Output (part-00000) ---
First 10 words from this reducer:
'as	1
11	4
1991	1
2	3
20	1
5	4
50	1
596	1
6221541	1
8	5
Total words in this reducer: 1037

--- Reducer 1 Output (part-00001) ---
First 10 words from this reducer:
0	3
000	1
12	1
1500	1
1887	1
2001	1
3	11
30	1
501	1
6	2
Total words in this reducer: 1037

--- Reducer 2 Output (part-00002) ---
First 10 words from this reducer:
1	47
2020	1
4	5
64	1
7	3
a	695
absence	1
accept	1
accessed	1
accessible	1
Total words in this reducer: 974

COMBINED RESULTS (Manual Aggregation Required!)
Total unique words: 3048

Top 10 words (after aggregating all 3 reducers):
the                  1839
and          

In [None]:
## 8) Experiment with Combiner

A combiner pre-aggregates data on mapper nodes before sending to reducers.
This reduces network traffic and speeds up the job.

In [12]:
# Add new S3 output path at the top of your notebook (in Section 0)
S3_OUTPUT_COMBINER = f"{S3_BASE}/output_combiner/"

# Run job with combiner
run(['aws','s3','rm', S3_OUTPUT_COMBINER, '--recursive'], check=False)

cmd = [
    'hadoop','jar', str(STREAMING_JAR),
    '-D','mapreduce.job.name=wordcount-with-combiner',
    '-D','mapreduce.job.reduces=3',
    '-files','mapper.py,reducer.py',
    '-mapper','mapper.py',
    '-reducer','reducer.py',
    '-combiner','reducer.py',  # THIS IS THE NEW LINE!
    '-input', S3_INPUT,
    '-output', S3_OUTPUT_COMBINER,
]
run(cmd)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

? aws s3 rm s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output_combiner/ --recursive
? hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -D mapreduce.job.name=wordcount-with-combiner -D mapreduce.job.reduces=3 -files mapper.py,reducer.py -mapper mapper.py -reducer reducer.py -combiner reducer.py -input s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/input/ -output s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output_combiner/
packageJobJar: [] [/usr/lib/hadoop/hadoop-streaming-3.4.1-amzn-4.jar] /tmp/streamjob18072558437374551796.jar tmpDir=null

2026-02-02 01:07:01,477 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager at ip-172-31-35-47.ec2.internal/172.31.35.47:8032
2026-02-02 01:07:01,533 INFO client.AHSProxy: Connecting to Application History server at ip-172-31-35-47.ec2.internal/172.31.35.47:10200
2026-02-02 01:07:01,556 INFO client.DefaultNoHARMFailoverProxyProvider: Connecting to ResourceManager 

In [13]:
print("=== Combiner Job Results ===")
run(['aws','s3','ls', S3_OUTPUT_COMBINER])

# Aggregate combiner results
combiner_words = {}
for i in range(3):
    p = subprocess.run(['aws','s3','cp', f"{S3_OUTPUT_COMBINER}part-0000{i}", '-'], 
                       text=True, capture_output=True, check=False)
    if p.returncode == 0:
        for line in p.stdout.strip().splitlines():
            if line:
                word, count = line.split('\t')
                combiner_words[word] = combiner_words.get(word, 0) + int(count)

print(f"Total unique words with combiner: {len(combiner_words)}")

sorted_combiner = sorted(combiner_words.items(), key=lambda x: x[1], reverse=True)
print("\nTop 10 words with combiner:")
for word, count in sorted_combiner[:10]:
    print(f"{word:20s} {count}")

print(f"\nResults match without combiner: {all_words == combiner_words}")
print("The combiner gives SAME results but with better performance!")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

=== Combiner Job Results ===
? aws s3 ls s3://aws-logs-346690756907-us-east-1/mapreduce/wordcount_demo/output_combiner/
2026-02-02 01:07:28          0 _SUCCESS
2026-02-02 01:07:25       9840 part-00000
2026-02-02 01:07:25       9780 part-00001
2026-02-02 01:07:28       9154 part-00002

Total unique words with combiner: 3048

Top 10 words with combiner:
the                  1839
and                  942
to                   811
a                    695
of                   638
it                   610
she                  553
i                    546
you                  486
said                 462

Results match without combiner: True
The combiner gives SAME results but with better performance!

## Troubleshooting
- **S3 AccessDenied**: EMR role needs `s3:ListBucket`, `s3:GetObject`, `s3:PutObject`.
- **Output already exists**: delete it (`aws s3 rm ... --recursive`) or change `PREFIX`.
- **Jar path missing**: locate with `sudo find /usr/lib -name "*streaming*.jar" | head`.
