In [None]:
#Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.

In [None]:
import configparser

def display_core_components(config_file_path):
    config = configparser.ConfigParser()
    config.read(config_file_path)

    if 'core-site' in config:
        core_components = config['core-site'].get('fs.defaultFS', '').split("://")[0]
        print("Core components of Hadoop:")
        print(core_components)
    else:
        print("No 'core-site' section found in the configuration file.")

# Specify the path to your Hadoop configuration file
config_file_path = '/path/to/hadoop/config/file.xml'

# Call the function to display the core components
display_core_components(config_file_path)


In [None]:
#Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

In [None]:
from hdfs import InsecureClient

def calculate_directory_size(hdfs_url, hdfs_directory):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # Retrieve the file status of the directory
    directory_status = client.status(hdfs_directory)

    # Check if the directory is a directory
    if not directory_status['type'] == 'DIRECTORY':
        print("Error: Specified path is not a directory.")
        return None

    # Calculate the total size of the files in the directory
    total_size = 0

    for root, dirs, files in client.walk(hdfs_directory):
        for file in files:
            file_path = f"{root}/{file}"
            file_status = client.status(file_path)
            total_size += file_status['length']

    return total_size

# Specify the HDFS URL and directory path
hdfs_url = 'http://localhost:9870'
hdfs_directory = '/user/hadoop/data'

# Call the function to calculate the total file size
total_size = calculate_directory_size(hdfs_url, hdfs_directory)

if total_size is not None:
    print("Total file size:", total_size, "bytes")


In [None]:
#
# Create a Python program that extracts and displays the top N most frequent words from a large text file using the MapReduce approach.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
from collections import Counter

WORD_REGEX = re.compile(r"\b\w+\b")

class TopNWords(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_n)
        ]

    def mapper_get_words(self, _, line):
        words = WORD_REGEX.findall(line)
        for word in words:
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_top_n(self, _, word_count_pairs):
        top_n = Counter()
        for count, word in word_count_pairs:
            top_n[word] = count
            if len(top_n) > N:
                top_n = Counter(top_n.most_common(N))
        for word, count in top_n.most_common(N):
            yield word, count


# Specify the path to your large text file
input_file_path = '/path/to/large/text/file.txt'
# Specify the value of N for the top N most frequent words
N = 10

if __name__ == '__main__':
    TopNWords.run(args=[input_file_path])


In [None]:
#Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's REST API.


In [None]:
import requests

# Specify the URL of the NameNode's REST API
namenode_url = 'http://<namenode-hostname>:<port>/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus'

# Specify the URL of the DataNodes' REST API
datanodes_url = 'http://<datanode-hostname>:<port>/jmx?qry=Hadoop:service=DataNode,name=DataNodeStatus'

def check_namenode_health():
    response = requests.get(namenode_url)
    if response.status_code == 200:
        data = response.json()
        health_status = data['beans'][0]['State']
        return health_status
    else:
        return 'Unknown'

def check_datanodes_health():
    response = requests.get(datanodes_url)
    if response.status_code == 200:
        data = response.json()
        datanodes = data['beans']
        health_status = {dn['name']: dn['DatanodeState'] for dn in datanodes}
        return health_status
    else:
        return {}

# Check the health status of the NameNode
namenode_health = check_namenode_health()
print("NameNode health status:", namenode_health)

# Check the health status of the DataNodes
datanodes_health = check_datanodes_health()
print("DataNodes health status:")
for datanode, status in datanodes_health.items():
    print(f"{datanode}: {status}")


In [None]:
#Develop a Python program that lists all the files and directories in a specific HDFS path.


In [None]:
from hdfs import InsecureClient

def list_hdfs_path(hdfs_url, hdfs_path):
    client = InsecureClient(hdfs_url)
    file_list = client.list(hdfs_path, status=True)

    for file_info in file_list:
        file_path = file_info['path']
        file_type = file_info['type']
        print(f"{file_type}: {file_path}")

# Specify the HDFS URL and the path you want to list
hdfs_url = 'http://localhost:9870'
hdfs_path = '/user/hadoop/data'

# Call the function to list the files and directories
list_hdfs_path(hdfs_url, hdfs_path)


In [None]:
#Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the nodes with the highest and lowest storage capacities.


In [None]:
from hdfs import InsecureClient

def analyze_storage_utilization(hdfs_url):
    client = InsecureClient(hdfs_url)
    datanodes = client.list('/datanode')

    storage_utilization = {}
    for datanode in datanodes:
        datanode_info = client.status(f'/datanode/{datanode}')
        storage_utilization[datanode] = datanode_info['length']

    if not storage_utilization:
        print("No DataNodes found.")
        return

    # Find DataNode with highest storage capacity
    max_storage_datanode = max(storage_utilization, key=storage_utilization.get)
    max_storage_capacity = storage_utilization[max_storage_datanode]

    # Find DataNode with lowest storage capacity
    min_storage_datanode = min(storage_utilization, key=storage_utilization.get)
    min_storage_capacity = storage_utilization[min_storage_datanode]

    print("Storage Utilization Analysis:")
    print("DataNode with Highest Storage Capacity:")
    print(f"  - DataNode: {max_storage_datanode}")
    print(f"  - Storage Capacity: {max_storage_capacity} bytes")

    print("DataNode with Lowest Storage Capacity:")
    print(f"  - DataNode: {min_storage_datanode}")
    print(f"  - Storage Capacity: {min_storage_capacity} bytes")

# Specify the HDFS URL
hdfs_url = 'http://localhost:9870'

# Call the function to analyze storage utilization
analyze_storage_utilization(hdfs_url)


In [None]:
# Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output.


In [None]:
import requests
import time

# Specify the URL of the YARN ResourceManager API
resource_manager_url = 'http://<resource-manager-hostname>:<port>/ws/v1/cluster'

# Specify the path to your Hadoop job JAR file
hadoop_job_jar_path = '/path/to/hadoop/job.jar'

# Specify the main class of your Hadoop job
hadoop_job_main_class = 'com.example.HadoopJobMain'

# Specify the input and output paths for your Hadoop job
input_path = '/path/to/input'
output_path = '/path/to/output'

def submit_hadoop_job():
    # Submit the Hadoop job
    data = {
        'application-id': 'application_123456789_0001',  # Unique application ID for tracking the job
        'application-name': 'HadoopJob',
        'am-container-spec': {
            'local-resources': {
                'entry': [
                    {
                        'key': 'job.jar',
                        'value': {
                            'resource': hadoop_job_jar_path,
                            'type': 'FILE'
                        }
                    }
                ]
            },
            'commands': {
                'command': f'hadoop jar job.jar {hadoop_job_main_class} {input_path} {output_path}'
            },
            'memory': 1024,
            'vcores': 1
        }
    }

    response = requests.post(f"{resource_manager_url}/apps", json=data)
    if response.status_code == 202:
        return response.json()['application-id']
    else:
        print("Failed to submit the Hadoop job.")
        return None

def monitor_job_progress(application_id):
    # Monitor the progress of the Hadoop job
    while True:
        response = requests.get(f"{resource_manager_url}/apps/{application_id}")
        if response.status_code == 200:
            data = response.json()
            state = data['app']['state']
            final_status = data['app']['finalStatus']
            progress = data['app']['progress']
            print(f"Job state: {state}, Final status: {final_status}, Progress: {progress}%")

            if final_status != 'UNDEFINED':
                break
        else:
            print("Failed to retrieve job status.")
            break

        time.sleep(5)

def retrieve_job_output():
    # Retrieve the output of the Hadoop job
    response = requests.get(f"{resource_manager_url}/apps/{application_id}/state")
    if response.status_code == 200:
        data = response.json()
        final_status = data['app']['finalStatus']

        if final_status == 'SUCCEEDED':
            # Retrieve the job output from HDFS or any other storage location
            print("Job completed successfully.")
            # Add code to retrieve the output based on your storage configuration
        else:
            print("Job did not complete successfully.")
    else:
        print("Failed to retrieve job status.")

# Submit the Hadoop job
application_id = submit_hadoop_job()

if application_id:
    # Monitor the progress of the Hadoop job
    monitor_job_progress(application_id)

    # Retrieve the output of the Hadoop job
    retrieve_job_output()


In [None]:
#Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements, and track resource usage during job execution.


In [None]:
import requests
import time

# Specify the URL of the YARN ResourceManager API
resource_manager_url = 'http://<resource-manager-hostname>:<port>/ws/v1/cluster'

# Specify the path to your Hadoop job JAR file
hadoop_job_jar_path = '/path/to/hadoop/job.jar'

# Specify the main class of your Hadoop job
hadoop_job_main_class = 'com.example.HadoopJobMain'

# Specify the input and output paths for your Hadoop job
input_path = '/path/to/input'
output_path = '/path/to/output'

# Specify the resource requirements for the Hadoop job
memory_mb = 2048
vcores = 2

def submit_hadoop_job():
    # Submit the Hadoop job
    data = {
        'application-id': 'application_123456789_0001',  # Unique application ID for tracking the job
        'application-name': 'HadoopJob',
        'am-container-spec': {
            'local-resources': {
                'entry': [
                    {
                        'key': 'job.jar',
                        'value': {
                            'resource': hadoop_job_jar_path,
                            'type': 'FILE'
                        }
                    }
                ]
            },
            'commands': {
                'command': f'hadoop jar job.jar {hadoop_job_main_class} {input_path} {output_path}'
            },
            'resource': {
                'memory': memory_mb,
                'vCores': vcores
            }
        }
    }

    response = requests.post(f"{resource_manager_url}/apps", json=data)
    if response.status_code == 202:
        return response.json()['application-id']
    else:
        print("Failed to submit the Hadoop job.")
        return None

def track_resource_usage(application_id):
    # Track the resource usage of the Hadoop job
    while True:
        response = requests.get(f"{resource_manager_url}/apps/{application_id}/appattempts")
        if response.status_code == 200:
            data = response.json()
            attempts = data['appAttempts']['appAttempt']
            current_attempt = attempts[-1]
            container_id = current_attempt['containerId']

            container_response = requests.get(f"{resource_manager_url}/cluster/apps/{application_id}/appattempts/{container_id}/containers/{container_id}")
            if container_response.status_code == 200:
                container_data = container_response.json()
                container_state = container_data['container']['state']
                allocated_resources = container_data['container']['allocatedResources']
                allocated_memory_mb = allocated_resources['memory']
                allocated_vcores = allocated_resources['vCores']

                print(f"Container state: {container_state}")
                print(f"Allocated memory: {allocated_memory_mb} MB")
                print(f"Allocated vCores: {allocated_vcores}")
            else:
                print("Failed to retrieve container information.")

            if container_state == 'COMPLETE':
                break
        else:
            print("Failed to retrieve application attempts.")
            break

        time.sleep(5)

# Submit the Hadoop job
application_id = submit_hadoop_job()

if application_id:
    # Track the resource usage of the Hadoop job
    track_resource_usage(application_id)


In [None]:
#Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing the impact on overall job execution time.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import time

class MapReduceJob(MRJob):

    def configure_args(self):
        super(MapReduceJob, self).configure_args()
        self.add_passthru_arg('--split-size', default=64, type=int, help='Input split size in megabytes')

    def mapper(self, _, line):
        yield None, len(line)

    def reducer(self, _, lengths):
        yield None, sum(lengths)

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer)
        ]

# Specify the input file path
input_file_path = '/path/to/input/file.txt'

# Specify the different input split sizes to compare
split_sizes = [64, 128, 256]

# Run the MapReduce job with different input split sizes and compare the execution times
for split_size in split_sizes:
    start_time = time.time()

    # Run the MapReduce job
    job = MapReduceJob(args=[input_file_path, f'--split-size={split_size}'])
    with job.make_runner() as runner:
        runner.run()

        # Collect the output
        output = [line.strip() for line in runner.cat_output()]

    end_time = time.time()

    # Calculate the execution time
    execution_time = end_time - start_time

    # Print the results
    print(f"Input Split Size: {split_size} MB")
    print("Output:", output)
    print("Execution Time:", execution_time, "seconds")
    print("-" * 40)
