#### Prerequesites

Run ```conda install asyncio asynchttp nest_asyncio pandas gzip``` before running the code
or you can run the below command to install the packages

In [414]:
# %conda install asyncio asynchttp nest_asyncio pandas gzip 

In [415]:
# 1a [CSCI 490] Download & Extract Files (20 pts)

import requests
import os
import zipfile

decades_urls = ['http://faculty.cs.niu.edu/~dakoop/cs503-2022sp/a7/unemp-' +
                str(i) + '.zip' for i in range(1970, 2021, 10)]

def download_files_and_extract(url, response):
    file_name = url.split('/')[-1]
    current_dir_path = os.getcwd() + '/'
    zip_file_path = current_dir_path + file_name

    if not os.path.exists(zip_file_path):
        with open(zip_file_path, 'wb') as f:
            f.write(response.content)

    unzip_file_name = file_name.split('.')[0]
    unzip_file_path = current_dir_path + unzip_file_name
    if not os.path.exists(unzip_file_path):
        with zipfile.ZipFile(zip_file_path,"r") as zf:
            zf.extractall(current_dir_path)
    if os.path.exists(zip_file_path):
        os.remove(zip_file_path)

for url in decades_urls:
    response = requests.get(url)
    download_files_and_extract(url, response)

In [416]:
# 1b [CSCI 503] Download & Extract Files (30 pts)

import asyncio
import aiohttp

import nest_asyncio
nest_asyncio.apply()

async def extract_files_async(session, url):
    file_name = url.split('/')[-1]
    current_dir_path = os.getcwd() + '/'
    zip_file_path = current_dir_path + file_name

    if not os.path.exists(zip_file_path):
        async with aiofiles.open(zip_file_path, mode='w') as f:
            await f.write()

    unzip_file_name = file_name.split('.')[0]
    unzip_file_path = current_dir_path + unzip_file_name
    if not os.path.exists(unzip_file_path):
        with zipfile.ZipFile(zip_file_path,"r") as zf:
            zf.extractall(current_dir_path)

    if os.path.exists(zip_file_path):
        os.remove(zip_file_path)

async def dowload_file_async(session, url):
    async with session.get(url) as response:
        await extract_files_async(url, response)
        
async def download_all_files_async(decades_urls):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for url in decades_urls:
            task = asyncio.ensure_future(dowload_file_async(session, url))
            tasks.append(task)
        await asyncio.gather(*tasks, return_exceptions=True)

asyncio.get_event_loop().run_until_complete(download_all_files_async(decades_urls))

In [417]:
# 2 Find Matching Files (10 pts)
import glob

matched_files = [os.getcwd() + '/' + i for i in glob.glob("**/*.csv", recursive = True)]
print(matched_files)

['/workspace/CSCI_Assignments/assignment-7/unemp-1970/unemp.csv', '/workspace/CSCI_Assignments/assignment-7/unemp-1980/unemp.csv', '/workspace/CSCI_Assignments/assignment-7/unemp-1990/employment.csv', '/workspace/CSCI_Assignments/assignment-7/unemp-2000/unemployment.csv', '/workspace/CSCI_Assignments/assignment-7/unemp-2010/employment.csv', '/workspace/CSCI_Assignments/assignment-7/unemp-2020/unemp.csv']


In [418]:
# 3. Use Threads to Process Files (30 pts)

from concurrent.futures import ThreadPoolExecutor
import pandas as pd
import gzip

def process_data(data_file):
    df = pd.read_csv(data_file)
    df['COUNTY'] = df['COUNTY'].str.upper()
    df = df[df['COUNTY'].isin(counties_with_suffix)]
    df['RATE'] = df['UNEMPLOYED_NUMBER']/df['LABOR_FORCE']
    return df

counties_no_suffix = ["DEKALB", "KANE", "BOONE", "MCHENRY", "WINNEBAGO", "OGLE", "LEE","KENDALL"]
counties_with_suffix = list(map(lambda x: x + ' COUNTY', counties_no_suffix))

with ThreadPoolExecutor(max_workers=13) as e:
    result = pd.concat(list(e.map(process_data, matched_files)))
    for i in counties_no_suffix:
        county_df = result[result['COUNTY'] == str(i) + ' COUNTY']
        county_df.to_csv(str(i) + '.csv.gz', index=False, compression="gzip")


In [419]:
counties_no_suffix = ['DEKALB', 'KANE', 'BOONE', 'MCHENRY', 'WINNEBAGO', 'OGLE', 'LEE', 'KENDALL'] 
for c in sorted(counties_no_suffix):
    cdf = pd.read_csv(f'{c}.csv.gz')
    print(c, cdf.RATE.mean())

BOONE 0.08099938784155404
DEKALB 0.05766694572727062
KANE 0.06465744592318971
KENDALL 0.05521362047852945
LEE 0.06341724493660843
MCHENRY 0.05963539940389765
OGLE 0.06660581534993118
WINNEBAGO 0.07757288231143361


In [420]:
# Optional Delete files and folders
# If not needed comment the code and run all the cells again.

import shutil

for url in decades_urls:
    decades_dirs = os.getcwd() + '/' + url.split('/')[-1].split('.')[0]
    if os.path.exists(decades_dirs):
        shutil.rmtree(decades_dirs)

for i in counties_no_suffix:
    fname = os.getcwd() + '/' + str(i) + '.csv.gz'
    if os.path.exists(fname):
        os.remove(fname)