In [1]:
HADOOP_URL = "https://dlcdn.apache.org/hadoop/common/stable/hadoop-3.4.0.tar.gz"

import requests
import os
import tarfile

def download_and_extract_targz(url):
    response = requests.get(url)
    filename = url.rsplit('/', 1)[-1]
    HADOOP_HOME = filename[:-7]
    # set HADOOP_HOME environment variable
    os.environ['HADOOP_HOME'] = HADOOP_HOME
    if os.path.isdir(HADOOP_HOME):
      print("Not downloading, Hadoop folder {} already exists".format(HADOOP_HOME))
      return
    if response.status_code == 200:
        with open(filename, 'wb') as file:
            file.write(response.content)
        with tarfile.open(filename, 'r:gz') as tar_ref:
            extract_path = tar_ref.extractall(path='.')
            # Get the names of all members (files and directories) in the archive
            all_members = tar_ref.getnames()
            # If there is a top-level directory, get its name
            if all_members:
              top_level_directory = all_members[0]
              print(f"ZIP file downloaded and extracted successfully. Contents saved at: {top_level_directory}")
    else:
        print(f"Failed to download ZIP file. Status code: {response.status_code}")


download_and_extract_targz(HADOOP_URL)
# HADOOP_HOME was set earlier when downloading Hadoop distribution
print("HADOOP_HOME is {}".format(os.environ['HADOOP_HOME']))

os.environ['PATH'] = ':'.join([os.path.join(os.environ['HADOOP_HOME'], 'bin'), os.environ['PATH']])
print("PATH is {}".format(os.environ['PATH']))
import shutil

# set variable JAVA_HOME (install Java if necessary)
def is_java_installed():
    os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
    return os.environ['JAVA_HOME']

def install_java():
    # Uncomment and modify the desired version
    # java_version= 'openjdk-11-jre-headless'
    # java_version= 'default-jre'
    # java_version= 'openjdk-17-jre-headless'
    # java_version= 'openjdk-18-jre-headless'
    java_version= 'openjdk-19-jre-headless'

    print(f"Java not found. Installing {java_version} ... (this might take a while)")
    try:
        cmd = f"apt install -y {java_version}"
        subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        stdout_result = subprocess_output.stdout
        # Process the results as needed
        print("Done installing Java {}".format(java_version))
        os.environ['JAVA_HOME'] = os.path.realpath(shutil.which("java")).split('/bin')[0]
        print("JAVA_HOME is {}".format(os.environ['JAVA_HOME']))
    except subprocess.CalledProcessError as e:
        # Handle the error if the command returns a non-zero exit code
        print("Command failed with return code {}".format(e.returncode))
        print("stdout: {}".format(e.stdout))

# Install Java if not available
if is_java_installed():
    print("Java is already installed: {}".format(os.environ['JAVA_HOME']))
else:
    print("Installing Java")
    install_java()

ZIP file downloaded and extracted successfully. Contents saved at: hadoop-3.4.0
HADOOP_HOME is hadoop-3.4.0
PATH is hadoop-3.4.0/bin:/opt/bin:/usr/local/nvidia/bin:/usr/local/cuda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/tools/node/bin:/tools/google-cloud-sdk/bin
Java is already installed: /usr/lib/jvm/java-11-openjdk-amd64


# Simple distributed wordcount with MapReduce

Check that file `file.txt` exists, view size.

In [2]:
!ls -hal file.txt

ls: cannot access 'file.txt': No such file or directory


Copy file to HDFS

In [3]:
!hdfs dfs -put -f file.txt

put: `file.txt': No such file or directory


Erase `result` folder.

In [4]:
!hdfs dfs -rm -R result 2>/dev/null

Run the bash wordcount command `wc` in parallel on the distributed file.

In [5]:
!mapred streaming \
  -input file.txt \
  -output result \
  -mapper /bin/cat \
  -reducer /usr/bin/wc

2024-05-06 15:38:15,089 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2024-05-06 15:38:15,379 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2024-05-06 15:38:15,379 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2024-05-06 15:38:15,407 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2024-05-06 15:38:15,746 INFO mapreduce.JobSubmitter: Cleaning up the staging area file:/tmp/hadoop/mapred/staging/root171776672/.staging/job_local171776672_0001
2024-05-06 15:38:15,763 ERROR streaming.StreamJob: Error Launching job : Input path does not exist: file:/content/file.txt
Streaming Command Failed!


Check result of MapReduce job

In [6]:
!hdfs dfs -cat result/part*

cat: `result/part*': No such file or directory


Check that the word count is correct by comparing with `wc` on local host (warning: do not try with too large files).

In [7]:
!wc file.txt

wc: file.txt: No such file or directory
