In [None]:
import configparser

# Path to the Hadoop configuration file (e.g., core-site.xml)
config_file = '/path/to/hadoop/conf/core-site.xml'

# Create a ConfigParser object
config = configparser.ConfigParser()

# Read the Hadoop configuration file
config.read(config_file)

# Display all sections and properties
for section in config.sections():
    print(f"[{section}]")
    for key, value in config.items(section):
        print(f"{key} = {value}")
    print()


In [None]:
from hdfs import InsecureClient

def calculate_total_file_size(hdfs_url, directory_path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # Get the list of files in the directory
    file_list = client.list(directory_path, status=True)

    # Calculate the total file size
    total_size = sum(file['length'] for file in file_list)

    # Convert the size to a human-readable format
    total_size_formatted = client.status(directory_path, strict=False)['length']

    return total_size, total_size_formatted

# Example usage
hdfs_url = "http://localhost:50070"
directory_path = "/user/example/directory"

total_size, total_size_formatted = calculate_total_file_size(hdfs_url, directory_path)
print("Total File Size:", total_size)
print("Total File Size Formatted:", total_size_formatted)

In [None]:
pip install hdfs

In [4]:
pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
     -------------------------------------- 439.6/439.6 kB 2.8 MB/s eta 0:00:00
Installing collected packages: mrjob
Successfully installed mrjob-0.7.4
Note: you may need to restart the kernel to use updated packages.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

class TopNWords(MRJob):

    def mapper_extract_words(self, _, line):
        words = re.findall(r'\w+', line.lower())
        for word in words:
            yield word, 1

    def combiner_count_words(self, word, counts):
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        yield None, (sum(counts), word)

    def reducer_find_top_n(self, _, word_count_pairs):
        top_n = sorted(word_count_pairs, reverse=True)[:N]
        for count, word in top_n:
            yield word, count

    def steps(self):
        return [
            MRStep(mapper=self.mapper_extract_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_n)
        ]

# Set the value of N for the top N most frequent words
N = 10

# Run the MapReduce job
if __name__ == '__main__':
    TopNWords.run()


In [None]:
pip install requests

In [None]:
import requests

# Hadoop cluster information
namenode_host = 'namenode_hostname'
namenode_port = '50070'
datanode_port = '50075'

# Check NameNode health
namenode_url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
namenode_response = requests.get(namenode_url).json()
namenode_health = namenode_response['beans'][0]['State']

# Check DataNode health
datanode_url = f"http://{namenode_host}:{datanode_port}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
datanode_response = requests.get(datanode_url).json()
datanode_health = datanode_response['beans'][0]['State']

# Print health status
print(f"NameNode Health: {namenode_health}")
print(f"DataNode Health: {datanode_health}")


In [None]:
from hdfs import InsecureClient

def list_hdfs_path(hdfs_url, hdfs_path):
    # Create an HDFS client
    client = InsecureClient(hdfs_url)

    # List files and directories in the specified HDFS path
    file_list = client.list(hdfs_path, status=True)

    # Print the file and directory names
    for file in file_list:
        file_name = file['path']
        file_type = 'Directory' if file['type'] == 'DIRECTORY' else 'File'
        print(f"{file_type}: {file_name}")

# Example usage
hdfs_url = "http://localhost:50070"
hdfs_path = "/user/example/directory"

list_hdfs_path(hdfs_url, hdfs_path)


In [None]:
import requests

def get_datanode_storage_utilization(namenode_host, namenode_port):
    # Get the list of DataNodes from the Hadoop Namenode's JMX endpoint
    namenode_url = f"http://{namenode_host}:{namenode_port}/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"
    response = requests.get(namenode_url).json()
    datanodes = response['beans'][0]['LiveNodes']

    # Calculate storage utilization for each DataNode
    storage_utilization = {}
    for datanode in datanodes.values():
        storage_utilization[datanode['name']] = {
            'capacity': datanode['capacity'],
            'used': datanode['used'],
            'utilization': round(datanode['used'] / datanode['capacity'] * 100, 2)
        }

    return storage_utilization

def analyze_storage_utilization(storage_utilization):
    # Find the DataNode with the highest and lowest storage capacities
    sorted_datanodes = sorted(storage_utilization.items(), key=lambda x: x[1]['capacity'])
    highest_capacity_node = sorted_datanodes[-1]
    lowest_capacity_node = sorted_datanodes[0]

    return highest_capacity_node, lowest_capacity_node

# Hadoop cluster information
namenode_host = 'namenode_hostname'
namenode_port = '50070'

# Get storage utilization of DataNodes
storage_utilization = get_datanode_storage_utilization(namenode_host, namenode_port)

# Analyze storage utilization and find highest and lowest capacity nodes
highest_capacity_node, lowest_capacity_node = analyze_storage_utilization(storage_utilization)

# Print the results
print(f"Highest Capacity Node: {highest_capacity_node[0]}")
print(f"Capacity: {highest_capacity_node[1]['capacity']} bytes")
print(f"Utilization: {highest_capacity_node[1]['utilization']}%")
print()
print(f"Lowest Capacity Node: {lowest_capacity_node[0]}")
print(f"Capacity: {lowest_capacity_node[1]['capacity']} bytes")
print(f"Utilization: {lowest_capacity_node[1]['utilization']}%")


In [None]:
import requests
import time

# YARN ResourceManager information
resourcemanager_host = 'resourcemanager_hostname'
resourcemanager_port = '8088'

# Submit a Hadoop job
def submit_hadoop_job(job_name, jar_path, input_path, output_path):
    url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps"
    headers = {'Content-Type': 'application/json'}
    data = {
        "application": {
            "application-name": job_name,
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_path} input {input_path} output {output_path}"
                },
                "local-resources": {
                    "resource": [
                        {
                            "name": "job.jar",
                            "type": "FILE",
                            "visibility": "APPLICATION",
                            "uri": jar_path,
                            "size": -1
                        }
                    ]
                }
            },
            "application-type": "MAPREDUCE"
        }
    }

    response = requests.post(url, json=data, headers=headers)
    response_json = response.json()
    return response_json['application-id']

# Monitor job progress
def monitor_job_progress(job_id):
    url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{job_id}"
    headers = {'Content-Type': 'application/json'}

    while True:
        response = requests.get(url, headers=headers)
        response_json = response.json()
        app_state = response_json['app']['state']

        if app_state == 'FINISHED' or app_state == 'FAILED' or app_state == 'KILLED':
            break

        print(f"Job status: {app_state}")
        time.sleep(10)

    return app_state

# Retrieve final output
def retrieve_output(output_path):
    url = f"http://{resourcemanager_host}:{resourcemanager_port}/webhdfs/v1{output_path}?op=OPEN"
    response = requests.get(url)
    output = response.text

    return output

# Example usage
job_name = "MyHadoopJob"
jar_path = "hadoop_job.jar"
input_path = "/user/example/input"
output_path = "/user/example/output"

job_id = submit_hadoop_job(job_name, jar_path, input_path, output_path)
print(f"Job submitted with ID: {job_id}")

app_state = monitor_job_progress(job_id)
print(f"Job status: {app_state}")

if app_state == 'FINISHED':
    final_output = retrieve_output(output_path)
    print("Final Output:")
    print(final_output)


In [None]:
import requests
import time

# YARN ResourceManager information
resourcemanager_host = 'resourcemanager_hostname'
resourcemanager_port = '8088'

# Submit a Hadoop job with resource requirements
def submit_hadoop_job(job_name, jar_path, input_path, output_path, memory_mb, vcores):
    url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps"
    headers = {'Content-Type': 'application/json'}
    data = {
        "application": {
            "application-name": job_name,
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_path} input {input_path} output {output_path}"
                },
                "local-resources": {
                    "resource": [
                        {
                            "name": "job.jar",
                            "type": "FILE",
                            "visibility": "APPLICATION",
                            "uri": jar_path,
                            "size": -1
                        }
                    ]
                },
                "resource": {
                    "memory": memory_mb,
                    "vCores": vcores
                }
            },
            "application-type": "MAPREDUCE",
            "resource": {
                "memory": memory_mb,
                "vCores": vcores
            }
        }
    }

    response = requests.post(url, json=data, headers=headers)
    response_json = response.json()
    return response_json['application-id']

# Monitor job progress and track resource usage
def monitor_job_progress(job_id):
    url = f"http://{resourcemanager_host}:{resourcemanager_port}/ws/v1/cluster/apps/{job_id}"
    headers = {'Content-Type': 'application/json'}

    while True:
        response = requests.get(url, headers=headers)
        response_json = response.json()
        app_state = response_json['app']['state']
        app_memory = response_json['app']['allocatedMB']
        app_vcores = response_json['app']['allocatedVCores']

        if app_state == 'FINISHED' or app_state == 'FAILED' or app_state == 'KILLED':
            break

        print(f"Job status: {app_state}")
        print(f"Allocated Memory: {app_memory} MB")
        print(f"Allocated vCores: {app_vcores}")
        time.sleep(10)

    return app_state

# Example usage
job_name = "MyHadoopJob"
jar_path = "hadoop_job.jar"
input_path = "/user/example/input"
output_path = "/user/example/output"
memory_mb = 1024
vcores = 2

job_id = submit_hadoop_job(job_name, jar_path, input_path, output_path, memory_mb, vcores)
print(f"Job submitted with ID: {job_id}")

app_state = monitor_job_progress(job_id)
print(f"Job status: {app_state}")


In [None]:
from mrjob.job import MRJob
import time

class PerformanceComparisonJob(MRJob):

    def configure_args(self):
        super(PerformanceComparisonJob, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, default=1000,
                              help='Input split size')

    def mapper(self, _, line):
        yield None, line

    def reducer(self, _, lines):
        time.sleep(0.1)  # Simulate processing time
        yield None, None

    def steps(self):
        return [
            self.mr(mapper=self.mapper, reducer=self.reducer)
        ]

if __name__ == '__main__':
    performance_job = PerformanceComparisonJob()
    start_time = time.time()
    performance_job.run_job()
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"Execution Time: {execution_time} seconds")
