In [1]:
#q1
! hdfs dfsadmin -fs hdfs://boss:9000 -report | grep 'Live datanodes'

Live datanodes (2):


In [2]:
# Download CSV, convert to Parquet, and upload to HDFS
! rm -rf "/nb/hdma-wi-2021.csv"
! wget "https://pages.cs.wisc.edu/~harter/cs544/data/hdma-wi-2021.csv" -O "/nb/hdma-wi-2021.csv"

import pyarrow.csv as pv
import pyarrow.parquet as pq

csv_input_path = "/nb/hdma-wi-2021.csv"
parquet_output_path = "/nb/hdma-wi-2021.parquet"

data_table = pv.read_csv(csv_input_path)
pq.write_table(data_table, parquet_output_path)

# Removing existing files in HDFS
! hdfs dfs -rm -f hdfs://boss:9000/single.parquet
! hdfs dfs -rm -f hdfs://boss:9000/double.parquet

# Uploading Parquet files with different HDFS replication factors
! hdfs dfs -D dfs.block.size=1048576 -D dfs.replication=1 -copyFromLocal {parquet_output_path} hdfs://boss:9000/single.parquet
! hdfs dfs -D dfs.block.size=1048576 -D dfs.replication=2 -copyFromLocal {parquet_output_path} hdfs://boss:9000/double.parquet

--2024-10-26 03:27:56--  https://pages.cs.wisc.edu/~harter/cs544/data/hdma-wi-2021.csv
Resolving pages.cs.wisc.edu (pages.cs.wisc.edu)... 128.105.7.9
Connecting to pages.cs.wisc.edu (pages.cs.wisc.edu)|128.105.7.9|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 174944099 (167M) [text/csv]
Saving to: ‘/nb/hdma-wi-2021.csv’


2024-10-26 03:27:57 (164 MB/s) - ‘/nb/hdma-wi-2021.csv’ saved [174944099/174944099]



In [3]:
#q2
!hdfs dfs -du -h hdfs://boss:9000/single.parquet
!hdfs dfs -du -h hdfs://boss:9000/double.parquet


15.9 M  15.9 M  hdfs://boss:9000/single.parquet
15.9 M  31.7 M  hdfs://boss:9000/double.parquet


In [4]:
#q3
import requests
hdfs_url = "http://boss:9870/webhdfs/v1/single.parquet"
params = {
    "op": "GETFILESTATUS"
}
#q3
response = requests.get(hdfs_url, params=params)
file_status = response.json()
file_status

{'FileStatus': {'accessTime': 1729913292248,
  'blockSize': 1048576,
  'childrenNum': 0,
  'fileId': 16386,
  'group': 'supergroup',
  'length': 16642976,
  'modificationTime': 1729913293940,
  'owner': 'root',
  'pathSuffix': '',
  'permission': '644',
  'replication': 1,
  'storagePolicy': 0,
  'type': 'FILE'}}

In [5]:
#q4
url = "http://boss:9870/webhdfs/v1/single.parquet"
query_params = {
    "op": "OPEN",
    "offset": 0,
    "noredirect": "true"
}

hdfs_response = requests.get(url, params=query_params)
redirect_location = hdfs_response.json()['Location']
redirect_location

'http://5f58906980d3:9864/webhdfs/v1/single.parquet?op=OPEN&namenoderpcaddress=boss:9000&offset=0'

In [6]:
#q5
import re
from collections import defaultdict

def block_locations(hdfs_file_path, file_length, hdfs_block_size):
    hdfs_base_url = f"http://boss:9870/webhdfs/v1{hdfs_file_path}"
    block_map = defaultdict(int)

    for current_offset in range(0, file_length, hdfs_block_size):
        query_parameters = {
            "op": "OPEN",
            "offset": current_offset,
            "noredirect": "true"
        }
        
        hdfs_response = requests.get(hdfs_base_url, params=query_parameters)  
        if hdfs_response.status_code == 200:
            hdfs_data = hdfs_response.json()
            block_location = hdfs_data['Location']
            node_id = re.search(r'http://([^:]+):', block_location).group(1)
            block_map[node_id] += 1
    return dict(block_map)

status_query = {"op": "GETFILESTATUS"}
file_response = requests.get("http://boss:9870/webhdfs/v1/single.parquet", params=status_query)
file_metadata = file_response.json()['FileStatus']
file_total_length = file_metadata['length']
block_size_hdfs = file_metadata['blockSize']

#q5
block_distribution = block_locations(hdfs_file_path="/single.parquet", file_length=file_total_length, hdfs_block_size=block_size_hdfs)
block_distribution

{'5f58906980d3': 10, 'fb839ec815a3': 6}

In [7]:
#q6
import re
from collections import defaultdict

def fetch_block_distribution(hdfs_file_path, file_length):
    hdfs_base_url = f"http://boss:9870/webhdfs/v1{hdfs_file_path}"
    query_parameters = {
        "op": "GETFILEBLOCKLOCATIONS",
        "length": file_length
    }
    
    hdfs_response = requests.get(hdfs_base_url, params=query_parameters)   
    if hdfs_response.status_code == 200:
        block_locations = hdfs_response.json()['BlockLocations']['BlockLocation']
        host_block_distribution = defaultdict(int)
        for block in block_locations:
            for node in block['hosts']:
                host_block_distribution[node] += 1
        return dict(host_block_distribution)

status_query = {"op": "GETFILESTATUS"}
file_response = requests.get("http://boss:9870/webhdfs/v1/double.parquet", params=status_query)
file_metadata = file_response.json()['FileStatus']
file_total_length = file_metadata['length']

# Fetch block distribution
block_distribution = fetch_block_distribution("/double.parquet", file_total_length)
block_distribution

{'5f58906980d3': 16, 'fb839ec815a3': 16}

In [8]:
#q7
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import time

import pyarrow.fs

#q7
hdfs = pa.fs.HadoopFileSystem(host="boss", port=9000)
t0 = time.time()
with hdfs.open_input_file("/double.parquet") as parquet:
    table = pq.read_table(parquet)
loans_avg = pc.mean(table["loan_amount"]).as_py()
t1 = time.time()
time_difference = t1-t0
loans_avg


2024-10-26 03:28:31,874 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


204961.21752386744

In [9]:
#q8
hdfs = pa.fs.HadoopFileSystem(host="boss", port=9000)
t0 = time.time()
with hdfs.open_input_file("/double.parquet") as parquet_file:
    table = pq.read_table(parquet_file, columns=["loan_amount"])
avg_loan_amount = pc.mean(table["loan_amount"]).as_py()
t1 = time.time()
time2 = t1 - t0
optimized = time_difference / time2
optimized

25.175973389594933