In [1]:
import pandas as pd
import os
import pluto_parquet as pp
from zips import unzip_to_temp
from shutil import rmtree
from functools import partial
import warnings
from google.cloud import storage
import tempfile
import logging
from logging.config import fileConfig
from utils import cpu_use

In [2]:
warnings.filterwarnings('ignore', message='.*initial implementation of Parquet.*')
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = 'etc/secret.json'

In [3]:
log_file='extract_load.log'
log_path=os.path.join('logs', log_file)

loginipath='logs/logging_config.ini'
fileConfig(loginipath, defaults={'logfilename': log_path})
logger = logging.getLogger('sLogger')

In [4]:
client = storage.Client()
bucket = client.bucket('raw-pluto')

In [5]:
zip_links=pd.read_csv("etc/zip_links.csv")

In [None]:
for _, row in zip_links.iterrows():
    
    pluto_year=str(row.year)
    zip_path=row.path
    
    tmp_dir=unzip_to_temp(zip_path)
    paths=pp.filter_valid_pluto(tmp_dir)
    root_paths=['../..' + p for p in paths]
    
    aggregate_year = partial(pp.agg_from_path, year=pluto_year)
    gdf_list=list(map(aggregate_year, root_paths))
    
    logger.info(f'{cpu_use()} aggregating files: {len(gdf_list)} from year: {pluto_year}')
    df=pd.concat(gdf_list)
    del gdf_list
    logger.info(cpu_use())
    
    
    rmtree(tmp_dir)
    
    logger.info(f'{cpu_use()} cleaning temporary directory')
    
    with tempfile.NamedTemporaryFile() as fp:
        logger.info(f'{cpu_use()} aggregating files: writing aggregated file to tempfile')
        df.to_parquet(fp)
        data_shape=df.shape
        del df
        logger.info(f'{cpu_use()} deleted in memory dataframe')
        blob_name=f'pluto_{pluto_year}.parquet'
        logger.info(f'making blob: {blob_name}')
        blob=bucket.blob(blob_name)
        blob.upload_from_filename(fp.name)
        logger.info(f'succesfully loaded df with rows: {data_shape[0]} and columns: {data_shape[1]} to file: {blob_name}')

INFO - Downloading ZIPFile https://www1.nyc.gov/assets/planning/download/zip/data-maps/open-data/mappluto_02b.zip
INFO - Unzipping https://www1.nyc.gov/assets/planning/download/zip/data-maps/open-data/mappluto_02b.zip
INFO - reading from 0 to 50000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 50000 to 100000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 100000 to 150000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 150000 to 200000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 200000 to 250000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 250000 to 300000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - reading from 300000 to 350000 rows from file: qnmappluto.shp for year 2002
INFO - transforming data
INFO - cpu usage: 46.2% aggregating chun