# Assignment 7

The goal of this assignment is to work with the file system, concurrency, and basic data processing in Python.

# 0. Name & Z-ID (5 pts)

Juviny Noriega

Z1917876



# 1. Download & Extract Files



# 1b. [CSCI 503] Download & Extract Files (30 pts)

490 student attempting extra credit


In [17]:

import aiohttp,asyncio,os

#function that sucessfully fetches files and downloads


#function to read one file at a time
async def download_file(session, base_url, filename, directory='downloads'):
    if not os.path.exists(directory): 
        os.makedirs(directory)
    
    filepath = os.path.join(directory, filename)
    
    if not os.path.exists(filepath):  # Check if file has already been downloaded
        print(f"Starting download from {base_url}{filename}")
        async with session.get(base_url + filename) as response:
            if response.status == 200:
                with open(filepath, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                print(f"Downloaded {filename} successfully")
            else:
                print(f"Failed to download {filename}. Status code: {response.status}")
    else:
        print(f"{filename} already exists.")

# Function to handle multiple downloads
async def download_all_files(base_url, file_names):
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(download_file(session, base_url, filename)) for filename in file_names]
        await asyncio.gather(*tasks)

        

In [36]:

base_url = "https://faculty.cs.niu.edu/~dakoop/cs503-2024sp/a7/"
file_names = ["ab.zip", "c.zip", "df.zip", "gk.zip", "ln.zip", "o.zip", "pr.zip", "s.zip", "tv.zip", "wz.zip"]

# Directly await the async function in the notebook sinc ejupter notebook does asyncio.run()
results = await download_all_files(base_url, file_names)
for result in results:
    print(result)



ab.zip already exists.
c.zip already exists.
df.zip already exists.
gk.zip already exists.
ln.zip already exists.
o.zip already exists.
pr.zip already exists.
s.zip already exists.
tv.zip already exists.
wz.zip already exists.


TypeError: 'NoneType' object is not iterable

# Find Matching Files (15 pts)



In [19]:
# ' Some counties have their data stored in the numpy format (.npy) while others use the CSV format (.csv). In addition, some counties have 
# updated data that we want to use (not the original, older data). In these cases, there is a directory named mod or update in the second level 
# of the zip file (e.g. data/ab/mod) that contains the files we need. For a given letter (e.g. a.npy), if there is a file in a subdirectory of a mod 
# or update directory, use it. If there is not a file in that subdirectory, use the original file. Also ignore files with extensions that are not .npy or .csv. 
# Create a list of all the paths (as pathlib.Path objects) that will need to be processed. Note that some files may'

import zipfile
from pathlib import Path

def extract_zip_files(directory='downloads'):
    for zip_filename in os.listdir(directory):
        zip_path = Path(directory) / zip_filename
        extract_path = zip_path.parent / zip_filename[:-4] / 'data'

        if not extract_path.exists():
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(extract_path)
            print(f"Extracted {zip_filename}")
        else:
            print(f"Extraction directory {extract_path} already exists")


def find_priority_files(directory='downloads'):
    base_path = Path(directory)
    priority_files = []

    for zip_folder in base_path.iterdir():
        if zip_folder.is_dir():
            data_dir = zip_folder / 'data'
            if data_dir.exists():
                print(f"Checking {data_dir}")  # Debug print
                for folder in data_dir.iterdir():
                    mod_update_dirs = [d for d in folder.rglob('*') if d.is_dir() and d.name in ['mod', 'update']]
                    found = False

                    for mod_dir in mod_update_dirs:
                        for file in mod_dir.rglob('*'):
                            if file.suffix in ['.npy', '.csv']:
                                priority_files.append(file)
                                found = True
                    
                    if not found:
                        for file in folder.rglob('*'):
                            if file.suffix in ['.npy', '.csv']:
                                priority_files.append(file)

    return priority_files


# Get the list of prioritized file paths
file_paths = find_priority_files()
for path in file_paths:
    print(path)


# 3. Structural Pattern Matching to Process a File (20 pts)

In [37]:
import numpy as np
import pandas as pd
from pathlib import Path

def process_file(p: Path):
    # Identify if the file is from a mod/update directory or not
    updated = 'mod' in p.parts or 'update' in p.parts

    match p.suffix, updated:
        case '.npy', True:
            df = pd.DataFrame(np.load(p, allow_pickle=True))
        case '.csv', True:
            df = pd.read_csv(p)
        case '.npy', False:
            df = pd.DataFrame(np.load(p, allow_pickle=True))
            df['value'] *= 10  # Modify the 'value' column for non-updated npy files
        case '.csv', False:
            df = pd.read_csv(p)
            df['value'] *= 10  # Modify the 'value' column for non-updated csv files
    
    # Replace -999.0 with NaN in 'value' and 'number_of_accounts'
    df.replace(-999.0, np.nan, inplace=True)
    
    # Filter records where 'data_class' is 'electricity'
    df_electricity = df[df['data_class'] == 'electricity']
    
    return df_electricity

# 4. Using Threading to Process All Files (15 pts)

In [40]:
def process_all_files(file_paths):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = list(executor.map(process_file, (Path(path) for path in file_paths if path)))
    if results:
        concatenated_df = pd.concat(results, ignore_index=True)
        return concatenated_df
    else:
        return pd.DataFrame()  # Return an empty DataFrame if no results

# Main routine to orchestrate tasks
async def main():
    base_url = "https://example.com/path/to/files/"
    file_names = ["ab.zip", "c.zip", "df.zip", "gk.zip", "ln.zip", "o.zip", "pr.zip", "s.zip", "tv.zip", "wz.zip"]
    file_paths = await download_all_files(base_url, file_names)
    concatenated_df = process_all_files(file_paths)
    if not concatenated_df.empty:
        write_yearly_files(concatenated_df, 2021, 2023)

if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    asyncio.run(main())


ab.zip already exists.
c.zip already exists.
df.zip already exists.
gk.zip already exists.
ln.zip already exists.
o.zip already exists.
pr.zip already exists.
s.zip already exists.
tv.zip already exists.
wz.zip already exists.


TypeError: 'NoneType' object is not iterable