1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.



In [2]:
def read_hadoop_config(file_path):
    components = []
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.strip()
            if line.startswith('<name>'):
                component = line.lstrip('<name>').rstrip('</name>')
                components.append(component)
    return components

config_file = '/path/to/hadoop/conf/hadoop-site.xml'
core_components = read_hadoop_config(config_file)
print("Core components of Hadoop:")
for component in core_components:
    print(component)


2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.


In [2]:
from hdfs import InsecureClient

def get_directory_size(hdfs_url, directory_path):
    client = InsecureClient(hdfs_url)
    total_size = 0

    for file_status in client.list(directory_path, status=True):
        if file_status['type'] == 'DIRECTORY':
            subdirectory_path = directory_path + '/' + file_status['pathSuffix']
            total_size += get_directory_size(hdfs_url, subdirectory_path)
        else:
            total_size += file_status['length']

    return total_size

hdfs_url = 'http://localhost:9870'
directory_path = '/user/username/data'
total_size = get_directory_size(hdfs_url, directory_path)
print("Total file size in HDFS directory:", total_size, "bytes")


3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.


In [None]:
from collections import Counter
import multiprocessing

def map_reduce_word_count(file_path, num_top_words):
    with open(file_path, 'r') as file:
        pool = multiprocessing.Pool()
        results = pool.map(count_words, file)

    word_counts = Counter()
    for result in results:
        word_counts += Counter(result)

    top_words = word_counts.most_common(num_top_words)
    for word, count in top_words:
        print(word, count)

def count_words(line):
    words = line.strip().split()
    return Counter(words)

file_path = '/path/to/large_text_file.txt'
num_top_words = 10
map_reduce_word_count(file_path, num_top_words)


4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.


In [None]:
import requests

def check_hadoop_health(namenode_host, namenode_port):
    namenode_url = f"http://{namenode_host}:{namenode_port}"
    namenode_health_url = f"{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
    datanode_health_url = f"{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=DataNodeInfo"

    namenode_health_response = requests.get(namenode_health_url).json()
    datanode_health_response = requests.get(datanode_health_url).json()

    namenode_status = namenode_health_response['beans'][0]['State']
    datanode_status = datanode_health_response['beans'][0]['State']

    print("NameNode status:", namenode_status)
    print("DataNode status:", datanode_status)

namenode_host = 'localhost'
namenode_port = 50070
check_hadoop_health(namenode_host, namenode_port)


5. Develop a Python program that lists all the files and directories in a specific HDFS path.


In [None]:
from hdfs import InsecureClient

def list_hdfs_path(hdfs_url, path):
    client = InsecureClient(hdfs_url)
    items = client.list(path, status=True)

    for item in items:
        item_type = item['type']
        item_path = item['path']
        print(f"{item_type}: {item_path}")

hdfs_url = 'http://localhost:9870'
path = '/user/username/data'
list_hdfs_path(hdfs_url, path)


6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.
.

In [None]:
import requests

def analyze_datanode_storage_utilization(namenode_host, namenode_port):
    namenode_url = f"http://{namenode_host}:{namenode_port}"
    datanode_info_url = f"{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"

    datanode_info_response = requests.get(datanode_info_url).json()
    datanodes = datanode_info_response['beans'][0]['LiveNodes']

    storage_capacities = []
    for datanode_id, datanode_info in datanodes.items():
        storage_capacity = datanode_info['capacity']
        storage_capacities.append((datanode_id, storage_capacity))

    sorted_capacities = sorted(storage_capacities, key=lambda x: x[1])

    print("DataNodes with the highest storage capacities:")
    for datanode_id, storage_capacity in sorted_capacities[-3:]:
        print(f"DataNode: {datanode_id}, Storage Capacity: {storage_capacity} bytes")

    print("\nDataNodes with the lowest storage capacities:")
    for datanode_id, storage_capacity in sorted_capacities[:3]:
        print(f"DataNode: {datanode_id}, Storage Capacity: {storage_capacity} bytes")

namenode_host = 'localhost'
namenode_port = 50070
analyze_datanode_storage_utilization(namenode_host, namenode_port)


7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.


In [None]:
import requests
import time

def submit_hadoop_job(yarn_rm_host, yarn_rm_port, job_params):
    submit_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/new-application"
    response = requests.post(submit_url)
    app_id = response.json()['application-id']

    submit_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/{app_id}/submit"
    headers = {'Content-Type': 'application/json'}
    response = requests.post(submit_url, json=job_params, headers=headers)

    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
        print("Application ID:", app_id)

        tracking_url = response.json()['app']['trackingUrl']
        print("Tracking URL:", tracking_url)

        while True:
            status_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/{app_id}"
            status_response = requests.get(status_url)
            status = status_response.json()['app']['state']
            print("Job status:", status)

            if status == 'FINISHED' or status == 'FAILED' or status == 'KILLED':
                break

            time.sleep(5)

        if status == 'FINISHED':
            output_url = f"{tracking_url}/ws/v1/mapreduce/jobs/{app_id}/jobattempts"
            output_response = requests.get(output_url)
            output = output_response.json()['jobAttempts']['jobAttempt'][0]['logsLink']
            print("Job output:", output)

    else:
        print("Failed to submit the Hadoop job.")

yarn_rm_host = 'localhost'
yarn_rm_port = 8088
job_params = {
    "application-id": "application_1621203960916_0001",
    "application-name": "WordCount",
    "am-container-spec": {
        "commands": {
            "command": "hadoop jar WordCount.jar input output"
        }
    }
}
submit_hadoop_job(yarn_rm_host, yarn_rm_port, job_params)


8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.


In [None]:
import requests
import time

def submit_hadoop_job_with_resource_tracking(yarn_rm_host, yarn_rm_port, job_params):
    submit_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/new-application"
    response = requests.post(submit_url)
    app_id = response.json()['application-id']

    submit_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/{app_id}/submit"
    headers = {'Content-Type': 'application/json'}
    response = requests.post(submit_url, json=job_params, headers=headers)

    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
        print("Application ID:", app_id)

        while True:
            status_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/{app_id}"
            status_response = requests.get(status_url)
            status = status_response.json()['app']['state']
            print("Job status:", status)

            if status == 'FINISHED' or status == 'FAILED' or status == 'KILLED':
                break

            resources_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/cluster/apps/{app_id}/metrics/resource-usage"
            resources_response = requests.get(resources_url)
            resources = resources_response.json()['apps']['app'][0]['resources']
            print("Resource usage:", resources)

            time.sleep(5)

        if status == 'FINISHED':
            output_url = f"http://{yarn_rm_host}:{yarn_rm_port}/ws/v1/mapreduce/jobs/{app_id}/jobattempts"
            output_response = requests.get(output_url)
            output = output_response.json()['jobAttempts']['jobAttempt'][0]['logsLink']
            print("Job output:", output)

    else:
        print("Failed to submit the Hadoop job.")

yarn_rm_host = 'localhost'
yarn_rm_port = 8088
job_params = {
    "application-id": "application_1621203960916_0001",
    "application-name": "WordCount",
    "am-container-spec": {
        "commands": {
            "command": "hadoop jar WordCount.jar input output"
        }
    },
    "resource": {
        "vcores": 2,
        "memory": 2048
    }
}
submit_hadoop_job_with_resource_tracking(yarn_rm_host, yarn_rm_port, job_params)


9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time

In [None]:
import subprocess
import time

def run_mapreduce_job(input_path, output_path, split_size):
    start_time = time.time()

    hadoop_command = [
        'hadoop', 'jar', 'example.jar', 'ExampleJob',
        '-D', f'mapreduce.input.fileinputformat.split.minsize={split_size}',
        input_path, output_path
    ]
    subprocess.run(hadoop_command, check=True)

    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Input split size: {split_size}, Execution time: {execution_time} seconds")

input_path = '/input/data.txt'
output_path = '/output/result'
split_sizes = [128 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024]

for split_size in split_sizes:
    run_mapreduce_job(input_path, output_path, split_size)
