# Mapreduce assignment

## Imports and set dirs/vars

In [1]:
import os
import sys
import re
import subprocess
import pandas as pd
import numpy as np
from spellchecker import SpellChecker
from collections import Counter

In [2]:
# Set working directory

WD = "/Users/drmariamisayan/mapreducer_assignment"

# Create if doesn't exist
os.makedirs(WD, exist_ok=True)

In [3]:
# Change directory to WD 
os.chdir(WD)

In [4]:
# Check contents
os.system("ls -1")

[31mfile1.txt[m[m
[31mfile2.txt[m[m
[31mmapper1.py[m[m
[31mmapper2.py[m[m
[30m[43moutput1[m[m
[30m[43moutput2[m[m
[31mreducer1.py[m[m
[31mreducer2.py[m[m


0

In [18]:
# Check contents of files
os.system("head **/*.txt")

==> input1/file1.txt <==
as pale as he was, her long blonde hair hanging down her back, but beneath
the table her slim fingers closed briefly on his wrist. At her touch, Malfoy put
his hand into his robes, withdrew a wand, and passed it along to V oldemort,
who held it up in front of his red eyes, examining it closely.
“What is it?”
“Elm, my Lord,
” whispered Malfoy.
“And the core?”
“Dragon — dragon heartstring.
”

==> input2/file2.txt <==
large poster of the Wizarding band the Weird Sisters on one wall, and a
picture of Gwenog Jones, Captain of the all-witch Quidditch team the
Holyhead Harpies, on the other. A desk stood facing the open window, which
looked out over the orchard where he and Ginny had once played two-a-side
Quidditch with Ron and Hermione, and which now housed a large, pearly
white marquee. The golden flag on top was level with Ginny’s window.
Ginny looked up into Harry’s face, took a deep breath, and said,
“Happy
seventeenth.
”


0

In [84]:
# Create directories for outputs
for num in (1,2):
    os.makedirs(f"{WD}/output{num}", exist_ok=True)

In [25]:
# Check contents and new dirs
os.system("ls -1R")

[1m[36minput1[m[m
[1m[36minput2[m[m
[31mmapper1.py[m[m
[31mmapper2.py[m[m
[30m[43moutput1[m[m
[30m[43moutput2[m[m
[31mreducer1.py[m[m
[31mreducer2.py[m[m

./input1:
[31mfile1.txt[m[m

./input2:
[31mfile2.txt[m[m

./output1:

./output2:


0

## Start HDFS and copy files over

In [8]:
## Set path variables
# Set user
user = os.environ["USER"]

# Set Hadoop home
os.environ["HADOOP_HOME"] = "/usr/local/Cellar/hadoop/3.4.1/libexec"

# Add sbin to PATH
os.environ["PATH"] = os.path.join(os.environ["HADOOP_HOME"], "sbin") + os.pathsep + os.environ["PATH"]

In [None]:
os.system("start-all.sh")

In [None]:
# Create directories for outputs
for num in (1,2):
    os.system(f"hdfs dfs -mkdir -p /user/{user}/input{num}")

In [28]:
# Copy files to new input dir
for num in (1,2):
    os.system(f"hdfs dfs -put ./input{num}/file{num}.txt /user/{user}/input{num}")

2025-09-21 22:59:12,630 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-09-21 22:59:16,154 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [29]:
# Check contents
os.system(f"hdfs dfs -ls -R /user/{user}")

2025-09-21 22:59:19,583 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:55 /user/drmariamisayan/input
-rw-r--r--   1 drmariamisayan supergroup      22335 2025-09-21 22:55 /user/drmariamisayan/input/file1.txt
-rw-r--r--   1 drmariamisayan supergroup      20417 2025-09-21 22:55 /user/drmariamisayan/input/file2.txt
drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:59 /user/drmariamisayan/input1
-rw-r--r--   1 drmariamisayan supergroup      22335 2025-09-21 22:59 /user/drmariamisayan/input1/file1.txt
drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:59 /user/drmariamisayan/input2
-rw-r--r--   1 drmariamisayan supergroup      20417 2025-09-21 22:59 /user/drmariamisayan/input2/file2.txt


0

## Scripts

### Check scripts for file 1

In [30]:
os.system(f"cat mapper1.py")

#!/usr/bin/env python3
import sys, re

for line in sys.stdin:
    for word in line.strip().split():
        # keep only letters and apostrophes
        word = re.sub(r"[^a-zA-Z']+", "", word).lower()
        if word:
            print(f"{word}\t1")


0

In [31]:
os.system(f"cat reducer1.py")

#!/usr/bin/env python3
import sys

counts = {}

for line in sys.stdin:
    try:
        word, n = line.strip().split('\t')
        counts[word] = counts.get(word, 0) + int(n)
    except:
        continue

for word, total in counts.items():
    print(f"{word}\t{total}")



0

### Check scripts for file 2

In [32]:
os.system(f"cat mapper2.py")

#!/usr/bin/env python3
import sys, re
from spellchecker import SpellChecker

spell = SpellChecker()

for line in sys.stdin:
    words = re.findall(r"[a-zA-Z]+", line.lower())
    for w in words:
        if w not in spell:   # non-English
            print(f"{w}\t1")



0

In [33]:
os.system(f"cat reducer2.py")

#!/usr/bin/env python3
import sys
from collections import defaultdict

counts = defaultdict(int)
for line in sys.stdin:
    word, n = line.strip().split('\t')
    counts[word] += int(n)

for word, total in counts.items():
    print(f"{word}\t{total}")



0

## Run hadoop

In [93]:
# Run hadoop for file1.txt with mapper1.py and reducer1.py

def runMapReduce(input_dir, output_dir, mapper, reducer, 
                 hadoop_streamer="/usr/local/Cellar/hadoop/3.4.1/libexec/share/hadoop/tools/lib/hadoop-streaming-3.4.1.jar"):

    # Remove old output
    remover = (["hdfs", "dfs", "-rm", "-r", output_dir])
    subprocess.run(remover, check=True)
    
    # Run hadoop command in a subprocess
    processor = (["hadoop", "jar", hadoop_streamer,
                        "-input", input_dir,
                        "-output",output_dir, 
                        "-mapper", mapper,
                        "-reducer", reducer,
                        "-file", mapper,
                        "-file", reducer,
                        "-numReduceTasks", "1"
                       ])
    # Run command
    subprocess.run(processor, check=True)

## File 1

In [76]:
runMapReduce("/user/drmariamisayan/input1/file1.txt", "/user/drmariamisayan/output1", "mapper1.py", "reducer1.py")

2025-09-21 23:18:59,075 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Deleted /user/drmariamisayan/output1


2025-09-21 23:19:02,056 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2025-09-21 23:19:02,435 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


packageJobJar: [mapper1.py, reducer1.py] [] /var/folders/n_/hr553kvx21n_pmvyjztgw5bh0000gn/T/streamjob18313799321028312200.jar tmpDir=null


2025-09-21 23:19:03,672 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-09-21 23:19:03,937 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-09-21 23:19:03,937 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2025-09-21 23:19:03,969 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2025-09-21 23:19:05,010 INFO mapred.FileInputFormat: Total input files to process : 1
2025-09-21 23:19:05,279 INFO mapreduce.JobSubmitter: number of splits:1
2025-09-21 23:19:05,691 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local288401792_0001
2025-09-21 23:19:05,692 INFO mapreduce.JobSubmitter: Executing with tokens: []
2025-09-21 23:19:06,470 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/drmariamisayan/mapreducer_assignment/mapper1.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/job_local288401792_0001_2bfc7dba-e3fd-4e70-936a-05e210316257/mapper1.py
2025-09

In [None]:
# Get the output to local dir
os.system(f"rm -r {WD}/output1/part-00000")
os.system(f"hdfs dfs -get {output1_dir}/part-00000 {WD}/output1/part-00000")

In [106]:
file1 = pd.read_csv(f"{WD}/output1/part-00000", sep='\t', header=None, names=["word", "count"])
file1

Unnamed: 0,word,count
0,1945,1
1,a,61
2,aberforth,4
3,about,10
4,absently,1
5,accept,1
6,accompanying,2
7,accorded,1
8,according,1
9,achievements,1


## File 2

In [107]:
runMapReduce("/user/drmariamisayan/input2/file2.txt", "/user/drmariamisayan/output2", "mapper2.py", "reducer2.py")

2025-09-21 23:29:00,578 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Deleted /user/drmariamisayan/output2


2025-09-21 23:29:03,557 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2025-09-21 23:29:03,927 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


packageJobJar: [mapper2.py, reducer2.py] [] /var/folders/n_/hr553kvx21n_pmvyjztgw5bh0000gn/T/streamjob4440137042879722557.jar tmpDir=null


2025-09-21 23:29:05,129 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-09-21 23:29:05,401 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-09-21 23:29:05,401 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2025-09-21 23:29:05,437 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2025-09-21 23:29:06,373 INFO mapred.FileInputFormat: Total input files to process : 1
2025-09-21 23:29:06,695 INFO mapreduce.JobSubmitter: number of splits:1
2025-09-21 23:29:07,114 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1051865084_0001
2025-09-21 23:29:07,114 INFO mapreduce.JobSubmitter: Executing with tokens: []
2025-09-21 23:29:07,731 INFO mapred.LocalDistributedCacheManager: Localized file:/Users/drmariamisayan/mapreducer_assignment/mapper2.py as file:/usr/local/Cellar/hadoop/hdfs/tmp/mapred/local/job_local1051865084_0001_6d96a9de-0a39-4571-bf4c-c47d4c33ed76/mapper2.py
2025-

In [108]:
# Get the output to local dir
os.system(f"rm -r {WD}/output2/part-00000")
os.system(f"hdfs dfs -get {output2_dir}/part-00000 {WD}/output2/part-00000")

2025-09-21 23:29:12,028 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


0

In [109]:
file2 = pd.read_csv(f"{WD}/output2/part-00000", sep='\t', header=None, names=["word", "count"])
file2

Unnamed: 0,word,count
0,aguely,1
1,albus,2
2,anythin,1
3,beedle,1
4,bilius,1
5,charlie's,1
6,couldn,1
7,crabapple,1
8,d'yeh,1
9,delacour,2


### Check all contents

In [87]:
# Check contents
os.system(f"hdfs dfs -ls -R")

2025-09-21 23:21:36,449 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:55 input
-rw-r--r--   1 drmariamisayan supergroup      22335 2025-09-21 22:55 input/file1.txt
-rw-r--r--   1 drmariamisayan supergroup      20417 2025-09-21 22:55 input/file2.txt
drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:59 input1
-rw-r--r--   1 drmariamisayan supergroup      22335 2025-09-21 22:59 input1/file1.txt
drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 22:59 input2
-rw-r--r--   1 drmariamisayan supergroup      20417 2025-09-21 22:59 input2/file2.txt
drwxr-xr-x   - drmariamisayan supergroup          0 2025-09-21 23:19 output1
-rw-r--r--   1 drmariamisayan supergroup          0 2025-09-21 23:19 output1/_SUCCESS
-rw-r--r--   1 drmariamisayan supergroup      12105 2025-09-21 23:19 output1/part-00000


0