In [None]:
# Question 1
import configparser

def display_hadoop_components():
    # Read the Hadoop configuration file
    config = configparser.ConfigParser()
    config.read('hadoop.conf')  # Replace 'hadoop.conf' with the path to your Hadoop configuration file

    # Display the core components of Hadoop
    if 'core-site' in config:
        core_components = config['core-site']
        print("Core Components of Hadoop:")
        print(f"NameNode: {core_components.get('fs.default.name')}")
        print(f"SecondaryNameNode: {core_components.get('dfs.namenode.secondary.http-address')}")
        print(f"DataNode: {core_components.get('dfs.datanode.http.address')}")

    if 'hdfs-site' in config:
        hdfs_components = config['hdfs-site']
        print("HDFS Components:")
        print(f"Block Size: {hdfs_components.get('dfs.blocksize')}")
        print(f"Replication Factor: {hdfs_components.get('dfs.replication')}")

    if 'yarn-site' in config:
        yarn_components = config['yarn-site']
        print("YARN Components:")
        print(f"ResourceManager: {yarn_components.get('yarn.resourcemanager.hostname')}")
        print(f"NodeManager: {yarn_components.get('yarn.nodemanager.hostname')}")

# Call the function to display Hadoop components
display_hadoop_components()

In [None]:
# Question 2
from pywebhdfs.webhdfs import PyWebHdfsClient

def calculate_directory_size(hdfs_host, hdfs_port, hdfs_user, hdfs_directory):
    # Connect to HDFS
    hdfs = PyWebHdfsClient(host=hdfs_host, port=hdfs_port, user_name=hdfs_user)

    # Get the file list in the directory
    file_list = hdfs.list_dir(hdfs_directory)['FileStatuses']['FileStatus']

    total_size = 0

    # Iterate over the files in the directory
    for file in file_list:
        file_path = f"{hdfs_directory}/{file['pathSuffix']}"

        # Get the file status to retrieve the file size
        file_status = hdfs.get_file_status(file_path)['FileStatus']
        file_size = file_status['length']

        # Add the file size to the total
        total_size += file_size

    return total_size

# Example usage
hdfs_host = 'localhost'
hdfs_port = 50070
hdfs_user = 'hadoop'
hdfs_directory = '/user/hadoop/data'

total_size = calculate_directory_size(hdfs_host, hdfs_port, hdfs_user, hdfs_directory)
print(f"Total file size in {hdfs_directory}: {total_size} bytes")

In [None]:
# Question 3
from mrjob.job import MRJob
from mrjob.step import MRStep
import heapq

class TopNWords(MRJob):

    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--N', type=int, default=10, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer),
            MRStep(reducer=self.top_n_words)
        ]

    def mapper(self, _, line):
        words = line.strip().split()
        for word in words:
            yield word.lower(), 1

    def combiner(self, word, counts):
        yield word, sum(counts)

    def reducer(self, word, counts):
        yield None, (sum(counts), word)

    def top_n_words(self, _, word_count_pairs):
        N = self.options.N
        top_n = heapq.nlargest(N, word_count_pairs)
        for count, word in top_n:
            yield word, count

if __name__ == '__main__':
    TopNWords.run()

In [None]:
# Question 4
import requests

def check_namenode_health(namenode_host, namenode_port):
    url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        state = data['beans'][0]['State']
        return state == 'active'
    else:
        return False

def check_datanode_health(datanode_host, datanode_port):
    url = f"http://{datanode_host}:{datanode_port}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        state = data['beans'][0]['State']
        return state == 'normal'
    else:
        return False

if __name__ == '__main__':
    namenode_host = '<namenode_host>'
    namenode_port = '<namenode_port>'
    datanode_hosts = ['<datanode1_host>', '<datanode2_host>', ...]
    datanode_port = '<datanode_port>'
    
    namenode_status = check_namenode_health(namenode_host, namenode_port)
    print(f"NameNode status: {'Healthy' if namenode_status else 'Unhealthy'}")
    
    for datanode_host in datanode_hosts:
        datanode_status = check_datanode_health(datanode_host, datanode_port)
        print(f"DataNode {datanode_host} status: {'Healthy' if datanode_status else 'Unhealthy'}")

In [None]:
# Question 5
import pyarrow.hdfs as hdfs

def list_hdfs_path(hdfs_host, hdfs_port, hdfs_path):
    fs = hdfs.connect(host=hdfs_host, port=hdfs_port)
    files = fs.ls(hdfs_path)
    for file in files:
        print(file)

if __name__ == '__main__':
    hdfs_host = '<hdfs_host>'
    hdfs_port = <hdfs_port>
    hdfs_path = '<hdfs_path>'
    
    list_hdfs_path(hdfs_host, hdfs_port, hdfs_path)

In [None]:
# Question 6
import requests

def analyze_data_node_storage(hadoop_host, hadoop_port):
    # Fetch the DataNodes information from Hadoop's REST API
    url = f"http://{hadoop_host}:{hadoop_port}/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-*"
    response = requests.get(url)
    data = response.json()

    # Extract storage capacities of DataNodes
    datanodes = data['beans']
    storage_capacities = {}
    for datanode in datanodes:
        storage_id = datanode['Storage'].split(",")[0].split("=")[1]
        capacity = datanode['Capacity']
        storage_capacities[storage_id] = capacity

    # Find the DataNode with the highest and lowest storage capacity
    highest_capacity_node = max(storage_capacities, key=storage_capacities.get)
    lowest_capacity_node = min(storage_capacities, key=storage_capacities.get)

    # Print the results
    print("DataNode Storage Utilization Analysis:")
    print("======================================")
    print(f"Highest Storage Capacity: {highest_capacity_node} - {storage_capacities[highest_capacity_node]}")
    print(f"Lowest Storage Capacity: {lowest_capacity_node} - {storage_capacities[lowest_capacity_node]}")

if __name__ == '__main__':
    hadoop_host = '<hadoop_host>'
    hadoop_port = '<hadoop_port>'
    
    analyze_data_node_storage(hadoop_host, hadoop_port)

In [None]:
# Question 7
import requests
import time

def submit_hadoop_job(resource_manager_host, resource_manager_port, job_file, input_path, output_path):
    # Submit the Hadoop job to the ResourceManager API
    url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/cluster/apps"
    headers = {'Content-Type': 'application/json'}
    data = {
        'application-id': 'my-hadoop-job',
        'application-name': 'My Hadoop Job',
        'am-container-spec': {
            'commands': {
                'command': f"hadoop jar {job_file} {input_path} {output_path}"
            },
            'local-resources': {
                'entry': [
                    {
                        'key': 'hadoop-mapreduce-examples.jar',
                        'value': {
                            'resource': 'file:/path/to/hadoop-mapreduce-examples.jar',
                            'type': 'FILE'
                        }
                    }
                ]
            }
        }
    }
    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
    else:
        print("Failed to submit Hadoop job.")

    # Monitor the progress of the Hadoop job
    application_id = response.json()['application-id']
    while True:
        url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/cluster/apps/{application_id}"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()['app']
            state = data['state']
            if state == 'FINISHED':
                print("Hadoop job finished successfully.")
                break
            elif state == 'FAILED':
                print("Hadoop job failed.")
                break
            else:
                progress = data['progress']
                print(f"Hadoop job progress: {progress}")
        else:
            print("Failed to retrieve Hadoop job status.")
            break

        time.sleep(5)  # Wait for 5 seconds before checking again

    # Retrieve the final output of the Hadoop job
    if state == 'FINISHED':
        url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/cluster/apps/{application_id}/attempts/1/containers"
        response = requests.get(url)
        if response.status_code == 200:
            container_id = response.json()['containers']['container'][0]['containerId']
            url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/node/containerlogs/{container_id}/stdout"
            response = requests.get(url)
            if response.status_code == 200:
                output = response.text
                print("Final output:")
                print(output)
            else:
                print("Failed to retrieve final output.")
        else:
            print("Failed to retrieve container details.")

if __name__ == '__main__':
    resource_manager_host = '<resource_manager_host>'
    resource_manager_port = '<resource_manager_port>'
    job_file = '<path_to_hadoop_job_jar_file>'
    input_path = '<input_path>'
    output_path = '<output_path>'

    submit_hadoop_job(resource_manager_host, resource_manager_port, job_file, input_path, output_path)

In [None]:
# Question 8
import requests
import time

def submit_hadoop_job(resource_manager_host, resource_manager_port, job_file, input_path, output_path, num_containers, memory_mb, vcores):
    # Submit the Hadoop job to the ResourceManager API
    url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/cluster/apps"
    headers = {'Content-Type': 'application/json'}
    data = {
        'application-id': 'my-hadoop-job',
        'application-name': 'My Hadoop Job',
        'am-container-spec': {
            'commands': {
                'command': f"hadoop jar {job_file} {input_path} {output_path}"
            },
            'local-resources': {
                'entry': [
                    {
                        'key': 'hadoop-mapreduce-examples.jar',
                        'value': {
                            'resource': 'file:/path/to/hadoop-mapreduce-examples.jar',
                            'type': 'FILE'
                        }
                    }
                ]
            },
            'resources': {
                'memory': memory_mb,
                'vcores': vcores
            },
            'instances': num_containers
        }
    }
    response = requests.post(url, headers=headers, json=data)
    if response.status_code == 202:
        print("Hadoop job submitted successfully.")
    else:
        print("Failed to submit Hadoop job.")

    # Monitor the progress of the Hadoop job
    application_id = response.json()['application-id']
    while True:
        url = f"http://{resource_manager_host}:{resource_manager_port}/ws/v1/cluster/apps/{application_id}"
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()['app']
            state = data['state']
            if state == 'FINISHED':
                print("Hadoop job finished successfully.")
                break
            elif state == 'FAILED':
                print("Hadoop job failed.")
                break
            else:
                progress = data['progress']
                print(f"Hadoop job progress: {progress}")

                # Track resource usage during job execution
                tracking_url = data['tracking-ui']
                url = f"{tracking_url}/ws/v1/mapreduce/jobs/{application_id}/counters"
                response = requests.get(url)
                if response.status_code == 200:
                    counters = response.json()
                    map_progress = counters['jobCounters']['counterGroup'][0]['counter'][0]['value']
                    reduce_progress = counters['jobCounters']['counterGroup'][1]['counter'][0]['value']
                    print(f"Map progress: {map_progress}")
                    print(f"Reduce progress: {reduce_progress}")
                else:
                    print("Failed to retrieve job counters.")

        else:
            print("Failed to retrieve Hadoop job status.")
            break

        time.sleep(5)  # Wait for 5 seconds before checking again

if __name__ == '__main__':
    resource_manager_host = '<resource_manager_host>'
    resource_manager_port = '<resource_manager_port>'
    job_file = '<path_to_hadoop_job_jar_file>'
    input_path = '<input_path>'
    output_path = '<output_path>'
    num_containers = 2  # Number of containers (task instances)
    memory_mb = 2048  # Memory per container in MB
    vcores = 2  # Number of virtual cores per container

    submit_hadoop_job(resource_manager_host, resource_manager_port, job_file, input_path, output_path, num_containers, memory_mb, vcores)

In [None]:
# Question 9
import time
import subprocess

def run_mapreduce_job(input_split_size):
    # Set the input split size as a parameter for the MapReduce job
    cmd = f"hadoop jar <path_to_mapreduce_jar_file> <mapreduce_input_path> <mapreduce_output_path> -D mapreduce.input.fileinputformat.split.maxsize={input_split_size}"
    subprocess.call(cmd, shell=True)

def compare_mapreduce_performance(input_split_sizes):
    for split_size in input_split_sizes:
        start_time = time.time()
        run_mapreduce_job(split_size)
        end_time = time.time()
        execution_time = end_time - start_time
        print(f"Input Split Size: {split_size} - Execution Time: {execution_time} seconds")

if __name__ == '__main__':
    input_split_sizes = [64 * 1024 * 1024, 128 * 1024 * 1024, 256 * 1024 * 1024]  # Example input split sizes in bytes
    compare_mapreduce_performance(input_split_sizes)