In [1]:
# 1. Write a Python program to read a Hadoop configuration file and display the core components of Hadoop.

def read_hadoop_config(config_file):
    config = {}
    with open(config_file, 'r') as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith('#'):
                key, value = line.split('=')
                config[key.strip()] = value.strip()
    return config

def display_core_components(config):
    if 'fs.defaultFS' in config:
        print('File System: {}'.format(config['fs.defaultFS']))
    if 'mapreduce.framework.name' in config:
        print('MapReduce Framework: {}'.format(config['mapreduce.framework.name']))
    if 'dfs.nameservices' in config:
        print('NameNode: {}'.format(config['dfs.nameservices']))
    if 'yarn.resourcemanager.hostname' in config:
        print('ResourceManager: {}'.format(config['yarn.resourcemanager.hostname']))

# Example usage
config_file = 'hadoop.conf'
config = read_hadoop_config(config_file)
display_core_components(config)


FileNotFoundError: [Errno 2] No such file or directory: 'hadoop.conf'

In [None]:
# 2. Implement a Python function that calculates the total file size in a Hadoop Distributed File System (HDFS) directory.

import subprocess

def calculate_directory_size(directory_path):
    command = ['hdfs', 'dfs', '-du', '-s', directory_path]
    output = subprocess.check_output(command).decode('utf-8')
    total_size = sum(int(line.split()[0]) for line in output.strip().split('\n'))
    return total_size

# Example usage
directory_path = '/user/myuser/data'
total_size = calculate_directory_size(directory_path)
print('Total File Size: {} bytes'.format(total_size))


In [None]:
# 3. Create a Python program that extracts and displays the top N most frequent words from a large text file using the 
# MapReduce approach.

from collections import Counter
import multiprocessing
import re

def map_function(line):
    words = re.findall(r'\w+', line.lower())
    return Counter(words)

def reduce_function(counters):
    return sum(counters, Counter())

def process_chunk(chunk):
    return reduce_function(map(map_function, chunk))

def get_top_words(filename, top_n):
    pool = multiprocessing.Pool()
    chunk_size = 1000
    with open(filename, 'r') as file:
        chunks = [pool.apply_async(process_chunk, args=([file.readline() for _ in range(chunk_size)],)) for _ in range(0, 100000, chunk_size)]
        counters = [result.get() for result in chunks]
    total_counter = reduce_function(counters)
    return total_counter.most_common(top_n)

# Example usage
filename = 'large_text_file.txt'
top_n = 10
top_words = get_top_words(filename, top_n)
for word, count in top_words:
    print('{}: {}'.format(word, count))



In [None]:
# 4. Write a Python script that checks the health status of the NameNode and DataNodes in a Hadoop cluster using Hadoop's 
# REST API.

import requests

def check_namenode_status(nn_url):
    response = requests.get(nn_url + '/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus')
    data = response.json()
    state = data['beans'][0]['State']
    live_nodes = data['beans'][0]['NumLiveDataNodes']
    dead_nodes = data['beans'][0]['NumDeadDataNodes']
    print('NameNode State: {}'.format(state))
    print('Live DataNodes: {}'.format(live_nodes))
    print('Dead DataNodes: {}'.format(dead_nodes))

def check_datanode_status(dn_url):
    response = requests.get(dn_url + '/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo')
    data = response.json()
    state = data['beans'][0]['State']
    capacity = data['beans'][0]['Capacity']
    used = data['beans'][0]['DfsUsed']
    remaining = data['beans'][0]['Remaining']
    print('DataNode State: {}'.format(state))
    print('Capacity: {} bytes'.format(capacity))
    print('Used: {} bytes'.format(used))
    print('Remaining: {} bytes'.format(remaining))

# Example usage
namenode_url = 'http://namenode:50070'
datanode_url = 'http://datanode:50075'
check_namenode_status(namenode_url)
check_datanode_status(datanode_url)


In [None]:
# 5. Develop a Python program that lists all the files and directories in a specific HDFS path.

import subprocess

def list_hdfs_path(path):
    command = ['hdfs', 'dfs', '-ls', path]
    output = subprocess.check_output(command).decode('utf-8')
    lines = output.strip().split('\n')
    files = [line.split()[-1] for line in lines[1:]]
    return files

# Example usage
hdfs_path = '/user/myuser/data'
files = list_hdfs_path(hdfs_path)
for file in files:
    print(file)


In [None]:
#6. Implement a Python program that analyzes the storage utilization of DataNodes in a Hadoop cluster and identifies the
# nodes with the highest and lowest storage capacities.

import requests

def get_datanode_stats(dn_url):
    response = requests.get(dn_url + '/jmx?qry=Hadoop:service=DataNode,name=FSDatasetState-UndefinedStorageId')
    data = response.json()
    stats = data['beans'][0]
    return stats

def analyze_storage_utilization(datanode_urls):
    storage_utilizations = []
    for url in datanode_urls:
        stats = get_datanode_stats(url)
        capacity = stats['Capacity']
        used = stats['DfsUsed']
        utilization = used / capacity
        storage_utilizations.append((url, utilization))
    
    storage_utilizations.sort(key=lambda x: x[1])
    return storage_utilizations

# Example usage
datanode_urls = ['http://datanode1:50075', 'http://datanode2:50075', 'http://datanode3:50075']
storage_utilizations = analyze_storage_utilization(datanode_urls)

print('Datanodes with highest storage utilization:')
for url, utilization in storage_utilizations[-5:]:
    print('{} - Utilization: {:.2%}'.format(url, utilization))

print('\nDatanodes with lowest storage utilization:')
for url, utilization in storage_utilizations[:5]:
    print('{} - Utilization: {:.7. Here's a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress, and retrieve the final output:

```python
import requests
import time

def submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path):
    submit_url = rm_url + '/ws/v1/cluster/apps/new-application'
    response = requests.post(submit_url)
    data = response.json()
    app_id = data['application-id']
    
    submit_job_url = rm_url + '/ws/v1/cluster/apps/' + app_id + '/app'
    payload = {
        'application-id': app_id,
        'application-name': 'MyHadoopJob',
        'am-container-spec': {
            'commands': {
                'command': 'hadoop jar {} {} {} {}'.format(jar_path, main_class, input_path, output_path)
            }
        },
        'unmanaged-AM': False,
        'max-app-attempts': 1
    }
    
    response = requests.put(submit_job_url, json=payload)
    if response.status_code == 202:
        print('Job submitted successfully.')
        return app_id
    else:
        print('Job submission failed.')
        return None

def monitor_job_progress(rm_url, app_id):
    get_job_url = rm_url + '/ws/v1/cluster/apps/' + app_id
    while True:
        response = requests.get(get_job_url)
        data = response.json()
        state = data['app']['state']
        progress = data['app']['progress']
        print('Job State: {}, Progress: {}%'.format(state, progress))
        
        if state in ['FINISHED', 'KILLED', 'FAILED']:
            break
        
        time.sleep(5)

def retrieve_job_output(hdfs_url, output_path):
    hdfs_output_url = hdfs_url + output_path
    response = requests.get(hdfs_output_url)
    if response.status_code == 200:
        print('Job output:')
        print(response.text)
    else:
        print('Failed to retrieve job output.')

# Example usage
rm_url = 'http://resourcemanager:8088'
hdfs_url = 'http://namenode:50070'
jar_path = 'myjob.jar'
main_class = 'com.myjob.Main'
input_path = '/input/data.txt'
output_path = '/output/result.txt'

app_id = submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path)
if app_id:
    monitor_job_progress(rm_url, app_id)
    retrieve_job_output(hdfs_url, output_path)


In [None]:
#7. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, monitor its progress,
# and retrieve the final output.

import requests
import time

def submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path, vcores, memory):
    submit_url = rm_url + '/ws/v1/cluster/apps/new-application'
    response = requests.post(submit_url)
    data = response.json()
    app_id = data['application-id']
    
    submit_job_url = rm_url + '/ws/v1/cluster/apps/' + app_id + '/app'
    payload = {
        'application-id': app_id,
        'application-name': 'MyHadoopJob',
        'am-container-spec': {
            'commands': {
                'command': 'hadoop jar {} {} {} {}'.format(jar_path, main_class, input_path, output_path)
            },
            'resources': {
                'vcores': vcores,
                'memory': memory
            }
        },
        'unmanaged-AM': False,
        'max-app-attempts': 1
    }
    
    response = requests.put(submit_job_url, json=payload)
    if response.status_code == 202:
        print('Job submitted successfully.')
        return app_id
    else:
        print('Job submission failed.')
        return None

def monitor_resource_usage(rm_url, app_id):
    get_app_url = rm_url + '/ws/v1/cluster/apps/' + app_id
    while True:
        response = requests.get(get_app_url)
        data = response.json()
        state = data['app']['state']
        resource_info = data['app']['allocatedResources']
        vcores = resource_info['vcoreSeconds']
        memory = resource_info['memorySeconds']
        print('Job State: {}, Allocated vCores: {}, Allocated Memory: {}'.format(state, vcores, memory))
        
        if state in ['FINISHED', 'KILLED', 'FAILED']:
            break
        
        time.sleep(5)

# Example usage
rm_url = 'http://resourcemanager:8088'
jar_path = 'myjob.jar'
main_class = 'com.myjob.Main'
input_path = '/input/data.txt'
output_path = '/output/result.txt'
vcores = 2
memory = 2048

app_id = submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path, vcores, memory)
if app_id:
    monitor_resource_usage(rm_url, app_id)


In [None]:
# 8. Create a Python script that interacts with YARN's ResourceManager API to submit a Hadoop job, set resource requirements
# , and track resource usage during job execution.

import requests
import time

def submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path, vcores, memory):
    submit_url = rm_url + '/ws/v1/cluster/apps/new-application'
    response = requests.post(submit_url)
    data = response.json()
    app_id = data['application-id']
    
    submit_job_url = rm_url + '/ws/v1/cluster/apps/' + app_id + '/app'
    payload = {
        'application-id': app_id,
        'application-name': 'MyHadoopJob',
        'am-container-spec': {
            'commands': {
                'command': 'hadoop jar {} {} {} {}'.format(jar_path, main_class, input_path, output_path)
            },
            'resources': {
                'vcores': vcores,
                'memory': memory
            }
        },
        'unmanaged-AM': False,
        'max-app-attempts': 1
    }
    
    response = requests.put(submit_job_url, json=payload)
    if response.status_code == 202:
        print('Job submitted successfully.')
        return app_id
    else:
        print('Job submission failed.')
        return None

def monitor_resource_usage(rm_url, app_id):
    get_app_url = rm_url + '/ws/v1/cluster/apps/' + app_id
    while True:
        response = requests.get(get_app_url)
        data = response.json()
        state = data['app']['state']
        resource_info = data['app']['allocatedResources']
        vcores = resource_info['vcoreSeconds']
        memory = resource_info['memorySeconds']
        print('Job State: {}, Allocated vCores: {}, Allocated Memory: {}'.format(state, vcores, memory))
        
        if state in ['FINISHED', 'KILLED', 'FAILED']:
            break
        
        time.sleep(5)

# Example usage
rm_url = 'http://resourcemanager:8088'
jar_path = 'myjob.jar'
main_class = 'com.myjob.Main'
input_path = '/input/data.txt'
output_path = '/output/result.txt'
vcores = 2
memory = 2048

app_id = submit_hadoop_job(rm_url, jar_path, main_class, input_path, output_path, vcores, memory)
if app_id:
    monitor_resource_usage(rm_url, app_id)


In [None]:
#9. Write a Python program that compares the performance of a MapReduce job with different input split sizes, showcasing 
# the impact on overall job execution time.

import subprocess
import time

def run_mapreduce_job(input_path, output_path, split_size):
    start_time = time.time()
    command = ['hadoop', 'jar', 'myjob.jar', 'com.myjob.Main', '-D', 'mapreduce.input.fileinputformat.split.minsize=' + split_size, input_path, output_path]
    subprocess.run(command)
    end_time = time.time()
    execution_time = end_time - start_time
    return execution_time

# Example usage
input_path = '/input/data.txt'
output_path = '/output/result.txt'
split_sizes = ['64m', '128m', '256m']

for split_size in split_sizes:
    execution_time = run_mapreduce_job(input_path, output_path, split_size)
    print('Split Size: {}, Execution Time: {} seconds'.format(split_size,
