In [1]:
# 1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.

import configparser

def display_core_components(config_file):
    # Create a ConfigParser object
    config = configparser.ConfigParser()
    
    # Read the Hadoop configuration file
    config.read(config_file)
    
    # Check if the 'core-site' section exists in the configuration file
    if 'core-site' in config:
        # Get the options in the 'core-site' section
        options = config['core-site']
        
        # Display the core components of Hadoop
        if 'fs.defaultFS' in options:
            print("Filesystem Name: ", options['fs.defaultFS'])
        if 'yarn.resourcemanager.address' in options:
            print("Resource Manager: ", options['yarn.resourcemanager.address'])
        if 'mapreduce.framework.name' in options:
            print("MapReduce Framework: ", options['mapreduce.framework.name'])
    else:
        print("No 'core-site' section found in the configuration file.")

# Provide the path to your Hadoop configuration file
config_file_path = '/path/to/hadoop/conf/core-site.xml'

# Call the function to display the core components
display_core_components(config_file_path)


No 'core-site' section found in the configuration file.


In [None]:
#2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

from pywebhdfs.webhdfs import PyWebHdfsClient

def calculate_directory_size(hdfs_host, hdfs_port, directory_path):
    # Create a PyWebHdfsClient object
    hdfs = PyWebHdfsClient(host=hdfs_host, port=hdfs_port)

    # Get the status of the directory
    directory_status = hdfs.list_dir(directory_path)

    # Calculate the total file size
    total_size = 0
    for file in directory_status['FileStatuses']['FileStatus']:
        total_size += file['length']

    # Return the total file size
    return total_size

# Provide the HDFS host, port, and directory path
hdfs_host = 'localhost'
hdfs_port = 9870
directory_path = '/path/to/hdfs/directory'

# Call the function to calculate the total file size
total_file_size = calculate_directory_size(hdfs_host, hdfs_port, directory_path)

# Display the total file size
print("Total File Size:", total_file_size, "bytes")


In [None]:
# 3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.
from mrjob.job import MRJob
from mrjob.step import MRStep
import heapq


class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--N', type=int, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_N)
        ]

    def mapper_get_words(self, _, line):
        for word in line.strip().split():
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_top_N(self, _, word_counts):
        N = self.options.N
        top_N_words = heapq.nlargest(N, word_counts)
        for count, word in top_N_words:
            yield word, count

if __name__ == '__main__':
    TopNWords.run()



In [None]:
Python top_n_words.py <input_file> --N <number_of_top_words>

In [None]:
# 4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.
import requests

# NameNode URL
namenode_url = 'http://<namenode_host>:<namenode_port>'

# DataNode URLs
datanode_urls = [
    'http://<datanode1_host>:<datanode1_port>',
    'http://<datanode2_host>:<datanode2_port>',
    # Add more DataNode URLs if required
]

def check_namenode_health():
    # Get the NameNode health status
    url = f'{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        state = data['beans'][0]['State']
        return state
    else:
        return 'Unknown'

def check_datanode_health():
    # Check the health status of each DataNode
    for datanode_url in datanode_urls:
        url = f'{datanode_url}/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-*'
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            state = data['beans'][0]['State']
            print(f'DataNode at {datanode_url} is {state}')
        else:
            print(f'Failed to fetch health status for DataNode at {datanode_url}')

# Check NameNode health
namenode_state = check_namenode_health()
print(f'NameNode is {namenode_state}')

# Check DataNode health
check_datanode_health()


In [None]:
# 5. Develop a Python program that lists all the files and directories in a specific HDFS path.
from pywebhdfs.webhdfs import PyWebHdfsClient

# HDFS connection details
hdfs_host = '<hdfs_host>'
hdfs_port = '<hdfs_port>'
hdfs_user = '<hdfs_user>'

# HDFS path to list
hdfs_path = '/path/to/directory'

# Create a WebHDFS client
client = PyWebHdfsClient(host=hdfs_host, port=hdfs_port, user_name=hdfs_user)

# List files and directories in the specified HDFS path
response = client.list_dir(hdfs_path)

# Process the response
if response['boolean']:
    # List operation successful
    file_status_list = response['FileStatuses']['FileStatus']
    for file_status in file_status_list:
        file_type = file_status['type']
        file_path = file_status['pathSuffix']
        print(f"{file_type}: {file_path}")
else:
    # List operation failed
    error_message = response['RemoteException']['message']
    print(f"Failed to list files and directories: {error_message}")

    

In [None]:
# 6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.
import requests

# NameNode URL
namenode_url = 'http://<namenode_host>:<namenode_port>'

def get_datanodes_storage_utilization():
    # Get the storage utilization of each DataNode
    url = f'{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        live_nodes = data['beans'][0]['LiveNodes']
        datanodes = live_nodes.split(';')
        
        # Dictionary to store DataNode storage utilization
        datanode_utilization = {}

        # Iterate over each DataNode
        for datanode in datanodes:
            datanode_info = datanode.split(',')
            datanode_name = datanode_info[0]
            used_storage = int(datanode_info[1])
            capacity_storage = int(datanode_info[2])
            utilization_percentage = round((used_storage / capacity_storage) * 100, 2)
            datanode_utilization[datanode_name] = utilization_percentage
        
        return datanode_utilization
    else:
        return {}

def get_highest_and_lowest_utilization_nodes(datanode_utilization):
    # Find the nodes with the highest and lowest storage utilization
    highest_utilization_node = max(datanode_utilization, key=datanode_utilization.get)
    lowest_utilization_node = min(datanode_utilization, key=datanode_utilization.get)
    highest_utilization = datanode_utilization[highest_utilization_node]
    lowest_utilization = datanode_utilization[lowest_utilization_node]
    
    return highest_utilization_node, highest_utilization, lowest_utilization_node, lowest_utilization

# Get DataNode storage utilization
datanode_utilization = get_datanodes_storage_utilization()

if datanode_utilization:
    # Print the DataNode storage utilization
    for datanode, utilization in datanode_utilization.items():
        print(f"DataNode: {datanode}, Utilization: {utilization}%")
    
    # Get the nodes with the highest and lowest storage utilization
    highest_node, highest_utilization, lowest_node, lowest_utilization = get_highest_and_lowest_utilization_nodes(datanode_utilization)
    print(f"\nNode with highest storage utilization: {highest_node}, Utilization: {highest_utilization}%")
    print(f"Node with lowest storage utilization: {lowest_node}, Utilization: {lowest_utilization}%")
else:
    print("Failed to fetch DataNode storage utilization.")


In [None]:
# 7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.
import requests

# ResourceManager URL
resourcemanager_url = 'http://<resourcemanager_host>:<resourcemanager_port>'

# Submit a Hadoop job
def submit_hadoop_job(jar_path, class_name, arguments):
    url = f'{resourcemanager_url}/ws/v1/cluster/apps/new-application'
    response = requests.post(url)
    if response.status_code == 200:
        data = response.json()
        application_id = data['application-id']
        submit_url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/app'
        payload = {
            'application-id': application_id,
            'application-name': 'Hadoop Job',
            'am-container-spec': {
                'local-resources': {
                    'entry': [
                        {'key': 'app.jar', 'value': {'resource': jar_path, 'type': 'FILE'}},
                    ]
                },
                'commands': {
                    'command': f'java -cp app.jar {class_name} {" ".join(arguments)}'
                },
                'environment': {
                    'entry': [
                        {'key': 'CLASSPATH', 'value': 'app.jar'}
                    ]
                },
                'resource': {'memory': 1024, 'vCores': 1}
            },
            'unmanaged-AM': False,
            'max-app-attempts': 1,
            'resource': {'memory': 1024, 'vCores': 1},
            'application-type': 'MAPREDUCE'
        }
        response = requests.post(submit_url, json=payload)
        if response.status_code == 202:
            print('Hadoop job submitted successfully.')
            return application_id
        else:
            print('Failed to submit Hadoop job.')
    else:
        print('Failed to get new application ID.')

# Get Hadoop job status
def get_hadoop_job_status(application_id):
    url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        state = data['app']['state']
        progress = data['app']['progress']
        final_status = data['app']['finalStatus']
        return state, progress, final_status
    else:
        return None, None, None

# Get Hadoop job final output
def get_hadoop_job_output(application_id):
    url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        attempt_id = data['appAttempts']['appAttempt'][0]['appAttemptId']
        containers_url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts/{attempt_id}/containers'
        response = requests.get(containers_url)
        if response.status_code == 200:
            data = response.json()
            container_id = data['containers']['container'][0]['id']
            logs_url = f'{resourcemanager_url}/proxy/{container_id}/ws/v1/mapreduce/jobs'
            response = requests.get(logs_url)
            if response.status_code == 200:
                data = response.json()
                logs = data['jobs']['job'][0]['diagnostics']
                return logs
            else:
                return 'Failed to retrieve job logs.'
        else:
            return 'Failed to retrieve containers information.'
    else:
        return 'Failed to retrieve application attempts.'

# Submit a Hadoop job
application_id = submit_hadoop_job('/path/to/hadoop-job.jar', 'com.example.HadoopJob', ['input', 'output'])

if application_id:
    # Monitor Hadoop job progress
    while True:
        state, progress, final_status = get_hadoop_job_status(application_id)
        if state == 'RUNNING':
            print(f'Job progress: {progress}')
        elif state in ('SUCCEEDED', 'FAILED', 'KILLED'):
            print(f'Job state: {state}')
            print(f'Job final status: {final_status}')
            break

    # Retrieve Hadoop job output
    if state == 'SUCCEEDED':
        output = get_hadoop_job_output(application_id)
        print(f'Job output: {output}')
    else:
        print('Job did not succeed.')
else:
    print('Failed to submit Hadoop job.')


In [None]:
# 8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.
import requests

# ResourceManager URL
resourcemanager_url = 'http://<resourcemanager_host>:<resourcemanager_port>'

# Submit a Hadoop job with resource requirements
def submit_hadoop_job(jar_path, class_name, arguments, memory, vcores):
    url = f'{resourcemanager_url}/ws/v1/cluster/apps/new-application'
    response = requests.post(url)
    if response.status_code == 200:
        data = response.json()
        application_id = data['application-id']
        submit_url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/app'
        payload = {
            'application-id': application_id,
            'application-name': 'Hadoop Job',
            'am-container-spec': {
                'local-resources': {
                    'entry': [
                        {'key': 'app.jar', 'value': {'resource': jar_path, 'type': 'FILE'}},
                    ]
                },
                'commands': {
                    'command': f'java -Xmx{memory}m -Xms{memory}m -cp app.jar {class_name} {" ".join(arguments)}'
                },
                'environment': {
                    'entry': [
                        {'key': 'CLASSPATH', 'value': 'app.jar'}
                    ]
                },
                'resource': {'memory': memory, 'vCores': vcores}
            },
            'unmanaged-AM': False,
            'max-app-attempts': 1,
            'resource': {'memory': memory, 'vCores': vcores},
            'application-type': 'MAPREDUCE'
        }
        response = requests.post(submit_url, json=payload)
        if response.status_code == 202:
            print('Hadoop job submitted successfully.')
            return application_id
        else:
            print('Failed to submit Hadoop job.')
    else:
        print('Failed to get new application ID.')

# Get resource usage for a Hadoop job
def get_resource_usage(application_id):
    url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts'
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        attempt_id = data['appAttempts']['appAttempt'][0]['appAttemptId']
        containers_url = f'{resourcemanager_url}/ws/v1/cluster/apps/{application_id}/appattempts/{attempt_id}/containers'
        response = requests.get(containers_url)
        if response.status_code == 200:
            data = response.json()
            containers = data['containers']['container']
            resource_usage = []
            for container in containers:
                container_id = container['id']
                container_memory = container['allocatedMB']
                container_vcores = container['allocatedVCores']
                resource_usage.append((container_id, container_memory, container_vcores))
            return resource_usage
        else:
            return 'Failed to retrieve containers information.'
    else:
        return 'Failed to retrieve application attempts.'

# Submit a Hadoop job with resource requirements
application_id = submit_hadoop_job('/path/to/hadoop-job.jar', 'com.example.HadoopJob', ['input', 'output'], 1024, 1)

if application_id:
    # Monitor resource usage for the Hadoop job
    while True:
        resource_usage = get_resource_usage(application_id)
        if isinstance(resource_usage, list):
            for container_id, container_memory, container_vcores in resource_usage:
                print(f'Container ID: {container_id}, Memory: {container_memory}MB, vCores: {container_vcores}')
        else:
            print(resource_usage)
        # Wait for a specific interval before checking resource usage again
        # You can adjust the sleep interval based on your requirements
        time.sleep(5)
else:
    print('Failed to submit Hadoop job.')


In [None]:
# 9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.
import subprocess
import time

# Input file path
input_file = '/path/to/input/file.txt'

# MapReduce job command
job_command = f'hadoop jar /path/to/hadoop-streaming.jar -input {input_file} -output output -mapper mapper.py -reducer reducer.py'

# List of input split sizes to compare
split_sizes = [10, 100, 1000]

# Function to run MapReduce job and measure execution time
def run_mapreduce_job(split_size):
    start_time = time.time()
    # Set input split size
    subprocess.run(f'hdfs dfs -D mapreduce.input.fileinputformat.split.maxsize={split_size} -rm -r -f output', shell=True)
    # Run MapReduce job
    subprocess.run(job_command, shell=True)
    end_time = time.time()
    execution_time = end_time - start_time
    return execution_time

# Compare MapReduce job performance for different input split sizes
for split_size in split_sizes:
    execution_time = run_mapreduce_job(split_size)
    print(f'Input Split Size: {split_size} - Execution Time: {execution_time} seconds')
