# Import Statements

In [1]:
import os
import requests
import json
import pyarrow as pa
import pyarrow.fs
import io

# Part 1: Deployment and Data Upload

In [2]:
#q1
! hdfs dfsadmin -fs hdfs://boss:9000 -report

Configured Capacity: 51642105856 (48.10 GB)
Present Capacity: 31571872892 (29.40 GB)
DFS Remaining: 31042887680 (28.91 GB)
DFS Used: 528985212 (504.48 MB)
DFS Used%: 1.68%
Replicated Blocks:
	Under replicated blocks: 0
	Blocks with corrupt replicas: 0
	Missing blocks: 0
	Missing blocks (with replication factor 1): 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0
Erasure Coded Block Groups: 
	Low redundancy block groups: 0
	Block groups with corrupt internal blocks: 0
	Missing block groups: 0
	Low redundancy blocks with highest priority to recover: 0
	Pending deletion blocks: 0

-------------------------------------------------
Live datanodes (2):

Name: 172.24.0.2:9866 (project-4-p4-rfwong07-dn-2.project-4-p4-rfwong07_default)
Hostname: 4011abf154e0
Decommission Status : Normal
Configured Capacity: 25821052928 (24.05 GB)
DFS Used: 268276021 (255.85 MB)
Non DFS Used: 10014555851 (9.33 GB)
DFS Remaining: 15521443840 (14.46 GB)
DFS Used%: 1.04%
DFS 

In [3]:
#Write code that downloads  https://pages.cs.wisc.edu/~harter/cs544/data/hdma-wi-2021.csv if
# it hasn't been already downloaded
if not os.path.exists('hdma-wi-2021.csv'):
    ! wget https://pages.cs.wisc.edu/~harter/cs544/data/hdma-wi-2021.csv


In [4]:
#Remove the single.csv and double.csv to avoid clutter/duplicates
!hdfs dfs -rm -f hdfs://boss:9000/single.csv
!hdfs dfs -rm -f hdfs://boss:9000/double.csv

Deleted hdfs://boss:9000/single.csv
Deleted hdfs://boss:9000/double.csv


In [5]:
#Next use `hdfs dfs -cp` to copy to cluster
# !hdfs dfs -mkdir hdfs://boss:9000/data
!hdfs dfs -Ddfs.block.size=1048576 -Ddfs.replication=1 -cp hdma-wi-2021.csv hdfs://boss:9000/single.csv
!hdfs dfs -Ddfs.block.size=1048576 -Ddfs.replication=2 -cp hdma-wi-2021.csv hdfs://boss:9000/double.csv

In [6]:
#q2
!hdfs dfs -du -h hdfs://boss:9000/ #Run a du command with hdfs dfs to see.

166.8 M  333.7 M  hdfs://boss:9000/double.csv
166.8 M  166.8 M  hdfs://boss:9000/single.csv


# Part 2: WebHDFS

In [7]:
#q3
namenode_url = "http://boss:9870/webhdfs/v1/single.csv"
getfilestatus_url = f"{namenode_url}?op=GETFILESTATUS" #Create the url with ?op=GETFILESTATUS 
r = requests.get(getfilestatus_url)
filestatus_dict = r.json() #Answer with a dictionary with json
filestatus_dict

{'FileStatus': {'accessTime': 1710622125129,
  'blockSize': 1048576,
  'childrenNum': 0,
  'fileId': 16388,
  'group': 'supergroup',
  'length': 174944099,
  'modificationTime': 1710622129054,
  'owner': 'root',
  'pathSuffix': '',
  'permission': '644',
  'replication': 1,
  'storagePolicy': 0,
  'type': 'FILE'}}

In [8]:
#q4
namenode_url = "http://boss:9870/webhdfs/v1/single.csv"
location_url = f"{namenode_url}?op=OPEN&offset=0&noredirect=true" #OPEN Operation offset=0 noredirect=true
r = requests.get(location_url)
location_string = list(r.json().values())[0] #Could also do r.json()['Location']
location_string

'http://4011abf154e0:9864/webhdfs/v1/single.csv?op=OPEN&namenoderpcaddress=boss:9000&offset=0'

In [9]:
#q5
namenode_url = "http://boss:9870/webhdfs/v1/single.csv"
location_url = f"{namenode_url}?op=GETFILEBLOCKLOCATIONS" #Reference https://hadoop.apache.org/docs/r3.3.6/hadoop-project-dist/hadoop-hdfs/WebHDFS.html#Get_File_Block_Locations
r = requests.get(location_url)
block_location_dict = r.json()

block_location_list = block_location_dict['BlockLocations']['BlockLocation']


hosts_count_dict = {} #Initiate counting dictionary
for item in block_location_list: #This loops through each item in the list and count up the "hosts" value
    cntr_value = item['hosts'][0]
    if cntr_value not in hosts_count_dict:
        hosts_count_dict[cntr_value] = 1 #if cntr id not in start key-value pair and start count at 1
    else:
        hosts_count_dict[cntr_value] += 1
hosts_count_dict

{'4011abf154e0': 90, '661631f2ec8b': 77}

# Part 3 Pyarrow

In [10]:
#q6 
hdfs = pa.fs.HadoopFileSystem("boss",9000)

#Open single.csv file with HDFS
#Referenced from lec/18-hdfs/nb/hdfs.ipynb for 'with as f' portion
with hdfs.open_input_file('/single.csv') as f:
    first_10_bytes = f.read_at(10, 0) #Read first 10 bytes offset at 0
first_10_bytes

2024-03-16 20:49:05,624 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


b'activity_y'

In [11]:
#q7
#Referenced from lec/18-hdfs/nb/hdfs.ipynb
single_family_count = 0 #Counter for "Single Family" string
with hdfs.open_input_file('/single.csv') as f:
    reader = io.TextIOWrapper(io.BufferedReader(f))
    for i, line in enumerate(reader):
        # print(line, end="")
        if "Single Family" in line:
            single_family_count += 1
        # if i > 10: #Debugging purposes for just the first couple of lines
        #     break
single_family_count

444874