In [None]:
import xml.etree.ElementTree as ET

def get_hadoop_components(file_path):
    components = []
    try:
        tree = ET.parse(file_path)
        root = tree.getroot()

        # Find the property elements with name "fs.defaultFS"
        for property_elem in root.iter('property'):
            name_elem = property_elem.find('name')
            value_elem = property_elem.find('value')

            if name_elem is not None and value_elem is not None:
                name = name_elem.text.strip()
                value = value_elem.text.strip()

                if name == "fs.defaultFS":
                    # Extract the hostname from the value
                    components.append(value.split("://")[1])

    except FileNotFoundError:
        print(f"File '{file_path}' not found.")
    except ET.ParseError:
        print(f"Error parsing the file '{file_path}'.")

    return components


# Provide the path to your core-site.xml file
config_file_path = "/path/to/core-site.xml"

components = get_hadoop_components(config_file_path)

if components:
    print("Core components of Hadoop:")
    for component in components:
        print(component)
else:
    print("No core components found.")


In [None]:
from hdfs import InsecureClient

def calculate_directory_size(hdfs_host, hdfs_port, hdfs_user, directory_path):
    # Create an HDFS client
    client = InsecureClient(f"http://{hdfs_host}:{hdfs_port}", user=hdfs_user)

    total_size = 0

    # Iterate over files in the directory
    for file_info in client.list(directory_path, status=True):
        file_path = file_info['path']
        if file_info['type'] == 'FILE':
            # Get the size of the file
            file_size = file_info['length']
            total_size += file_size

    return total_size


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import heapq


class TopNWords(MRJob):
    def configure_args(self):
        super(TopNWords, self).configure_args()
        self.add_passthru_arg('--top', type=int, help='Number of top words to display')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_n_words)
        ]

    def mapper_get_words(self, _, line):
        # Split the line into words and yield each word
        for word in line.split():
            yield word.lower(), 1

    def combiner_count_words(self, word, counts):
        # Sum the counts of each word
        yield word, sum(counts)

    def reducer_count_words(self, word, counts):
        # Sum the counts of each word
        yield word, sum(counts)

    def reducer_find_top_n_words(self, word, counts):
        # Create a heap of size N to keep track of the top N words
        top_n = self.options.top
        heap = [(count, word) for count, word in counts]
        top_words = heapq.nlargest(top_n, heap)

        # Yield each top word with its count
        for count, word in top_words:
            yield word, count

    def mapper_final(self):
        # Final mapper to sort the output
        for word, count in self.reducer_find_top_n_words():
            yield None, (count, word)

    def reducer_final(self, _, counts):
        # Final reducer to sort the output and display the top N words
        top_n = self.options.top
        heap = [(count, word) for count, word in counts]
        top_words = heapq.nlargest(top_n, heap)

        # Display the top N words
        for count, word in top_words:
            print(f'{word}: {count}')

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_top_n_words),
            MRStep(mapper_final=self.mapper_final,
                   reducer_final=self.reducer_final)
        ]


if __name__ == '__main__':
    TopNWords.run()


In [None]:
import requests

def check_namenode_health(namenode_url):
    # Send a GET request to the NameNode's JMX endpoint
    url = f"{namenode_url}/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        live_nodes = data['beans'][0]['LiveNodes']
        dead_nodes = data['beans'][0]['DeadNodes']

        # Check if the NameNode is in SafeMode
        safemode = data['beans'][0]['Safemode']

        if safemode:
            print("NameNode is in SafeMode.")
        else:
            print("NameNode is active.")

        print(f"Live Nodes: {live_nodes}")
        print(f"Dead Nodes: {dead_nodes}")
    else:
        print("Error checking NameNode health.")

def check_datanode_health(namenode_url):
    # Send a GET request to the NameNode's JMX endpoint
    url = f"{namenode_url}/jmx?qry=Hadoop:service=DataNode,name=DataNodeInfo"
    response = requests.get(url)

    if response.status_code == 200:
        data = response.json()
        datanode_count = data['beans'][0]['NumLiveDataNodes']

        print(f"Number of live DataNodes: {datanode_count}")
    else:
        print("Error checking DataNode health.")

# Provide the URL of the NameNode's web UI
namenode_url = "http://localhost:9870"

# Check NameNode health
check_namenode_health(namenode_url)

# Check DataNode health
check_datanode_health(namenode_url)


In [None]:
from hdfs import InsecureClient

def list_hdfs_path(hdfs_host, hdfs_port, hdfs_user, hdfs_path):
    # Create an HDFS client
    client = InsecureClient(f"http://{hdfs_host}:{hdfs_port}", user=hdfs_user)

    # List all files and directories in the given HDFS path
    contents = client.list(hdfs_path, status=True)

    # Print the file and directory names
    for item in contents:
        item_type = item['type']
        item_name = item['path']

        if item_type == 'DIRECTORY':
            print(f"[DIRECTORY] {item_name}")
        elif item_type == 'FILE':
            print(f"[FILE] {item_name}")

    return contents


# Provide the HDFS connection details
hdfs_host = 'localhost'
hdfs_port = 50070
hdfs_user = 'hadoop'

# Provide the HDFS path to list
hdfs_path = '/user/hadoop/data'

# List the files and directories in the HDFS path
list_hdfs_path(hdfs_host, hdfs_port, hdfs_user, hdfs_path)


In [None]:
from hdfs import InsecureClient

def analyze_storage_utilization(hdfs_host, hdfs_port, hdfs_user):
    # Create an HDFS client
    client = InsecureClient(f"http://{hdfs_host}:{hdfs_port}", user=hdfs_user)

    # Get the storage utilization information for all DataNodes
    datanodes = client.list('/datanodes', status=True)

    # Analyze storage utilization
    max_storage_node = None
    max_storage_capacity = 0
    min_storage_node = None
    min_storage_capacity = float('inf')

    for datanode in datanodes:
        node_name = datanode['path']
        storage_capacity = datanode['length']

        if storage_capacity > max_storage_capacity:
            max_storage_capacity = storage_capacity
            max_storage_node = node_name

        if storage_capacity < min_storage_capacity:
            min_storage_capacity = storage_capacity
            min_storage_node = node_name

    # Display the results
    print("Storage Utilization Analysis:")
    print(f"Highest Storage Capacity: {max_storage_node} - {max_storage_capacity} bytes")
    print(f"Lowest Storage Capacity: {min_storage_node} - {min_storage_capacity} bytes")


# Provide the HDFS connection details
hdfs_host = 'localhost'
hdfs_port = 50070
hdfs_user = 'hadoop'

# Analyze storage utilization of DataNodes
analyze_storage_utilization(hdfs_host, hdfs_port, hdfs_user)


In [None]:
import requests
import time

def submit_yarn_job(resource_manager_url, job_name, jar_file, main_class, input_path, output_path):
    # Submit the Hadoop job to the YARN ResourceManager
    url = f"{resource_manager_url}/ws/v1/cluster/apps/new-application"
    response = requests.post(url)

    if response.status_code == 200:
        data = response.json()
        application_id = data["application-id"]

        # Submit the job configuration
        job_submit_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/job"
        payload = {
            "application-id": application_id,
            "application-name": job_name,
            "application-type": "MAPREDUCE",
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_file} {main_class} {input_path} {output_path}"
                }
            },
            "kerberos-principal": "your_principal",
            "max-app-attempts": 2,
            "resource": {
                "memory": 1024,
                "vCores": 1
            },
            "queue": "default",
            "tags": []
        }
        response = requests.post(job_submit_url, json=payload)

        if response.status_code == 202:
            print("Job submitted successfully.")
            return application_id
        else:
            print("Error submitting job.")
    else:
        print("Error creating new application.")

    return None

def monitor_job_progress(resource_manager_url, application_id):
    # Monitor the progress of the job
    job_info_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}"

    while True:
        response = requests.get(job_info_url)

        if response.status_code == 200:
            data = response.json()
            state = data["app"]["state"]
            progress = data["app"]["progress"]

            print(f"Job state: {state}")
            print(f"Job progress: {progress}%")

            if state == "FINISHED":
                print("Job finished.")
                break
            elif state in ["KILLED", "FAILED"]:
                print("Job failed or terminated.")
                break

        time.sleep(5)

def retrieve_output(resource_manager_url, application_id):
    # Retrieve the final output of the job
    output_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/job/output"
    response = requests.get(output_url)

    if response.status_code == 200:
        data = response.json()
        output_path = data["jobOutput"]["outputPath"]

        print(f"Output path: {output_path}")
    else:
        print("Error retrieving job output.")

# Provide the ResourceManager URL
resource_manager_url = "http://localhost:8088"

# Provide the details of the job to be submitted
job_name = "MyHadoopJob"
jar_file = "path_to_your_hadoop_jar_file"
main_class = "your_hadoop_main_class"
input_path = "input_path_in_hdfs"
output_path = "output_path_in_hdfs"

# Submit the Hadoop job to YARN
application_id = submit_yarn_job(resource_manager_url, job_name, jar_file, main_class, input_path, output_path)

if application_id:
    # Monitor the progress of the job
    monitor_job_progress(resource_manager_url, application_id)

    # Retrieve the final output of the job
    retrieve_output(resource_manager_url, application_id)


In [None]:
import requests
import time

def submit_yarn_job(resource_manager_url, job_name, jar_file, main_class, input_path, output_path, memory_mb, vcores):
    # Submit the Hadoop job to the YARN ResourceManager
    url = f"{resource_manager_url}/ws/v1/cluster/apps/new-application"
    response = requests.post(url)

    if response.status_code == 200:
        data = response.json()
        application_id = data["application-id"]

        # Submit the job configuration
        job_submit_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/job"
        payload = {
            "application-id": application_id,
            "application-name": job_name,
            "application-type": "MAPREDUCE",
            "am-container-spec": {
                "commands": {
                    "command": f"hadoop jar {jar_file} {main_class} {input_path} {output_path}"
                }
            },
            "kerberos-principal": "your_principal",
            "max-app-attempts": 2,
            "resource": {
                "memory": memory_mb,
                "vCores": vcores
            },
            "queue": "default",
            "tags": []
        }
        response = requests.post(job_submit_url, json=payload)

        if response.status_code == 202:
            print("Job submitted successfully.")
            return application_id
        else:
            print("Error submitting job.")
    else:
        print("Error creating new application.")

    return None

def track_resource_usage(resource_manager_url, application_id):
    # Track the resource usage of the job
    resource_usage_url = f"{resource_manager_url}/ws/v1/cluster/apps/{application_id}/appattempts"

    while True:
        response = requests.get(resource_usage_url)

        if response.status_code == 200:
            data = response.json()
            app_attempts = data["appAttempts"]["appAttempt"]

            if isinstance(app_attempts, list):
                latest_attempt = app_attempts[-1]
                allocated_resources = latest_attempt["allocatedResources"]
                memory = allocated_resources["memory"]
                vcores = allocated_resources["vCores"]

                print(f"Allocated Memory: {memory} MB")
                print(f"Allocated vCores: {vcores}")

        time.sleep(5)

# Provide the ResourceManager URL
resource_manager_url = "http://localhost:8088"

# Provide the details of the job to be submitted
job_name = "MyHadoopJob"
jar_file = "path_to_your_hadoop_jar_file"
main_class = "your_hadoop_main_class"
input_path = "input_path_in_hdfs"
output_path = "output_path_in_hdfs"
memory_mb = 2048
vcores = 2

# Submit the Hadoop job to YARN
application_id = submit_yarn_job(resource_manager_url, job_name, jar_file, main_class, input_path, output_path, memory_mb, vcores)

if application_id:
    # Track the resource usage of the job
    track_resource_usage(resource_manager_url, application_id)


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import time


class MapReduceJob(MRJob):
    def configure_args(self):
        super(MapReduceJob, self).configure_args()
        self.add_passthru_arg('--split-size', type=int, help='Input split size')

    def mapper(self, _, line):
        # Mapper implementation
        # ...

    def reducer(self, key, values):
        # Reducer implementation
        # ...

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer)
        ]


if __name__ == '__main__':
    # Specify different input split sizes to compare
    split_sizes = [100, 1000, 10000]

    for split_size in split_sizes:
        # Create an instance of the MapReduce job
        job = MapReduceJob(args=['--split-size', str(split_size)])

        # Start measuring the execution time
        start_time = time.time()

        # Run the MapReduce job
        with job.make_runner() as runner:
            runner.run()

        # Calculate the execution time
        execution_time = time.time() - start_time

        print(f"Split size: {split_size}")
        print(f"Execution time: {execution_time} seconds")
        print("----------")
