#### To Do
- Don't upload if already exists in bucket folder
- Make unzip_blob a subflow?
- Delete zip blob, once unzip complete
- Use pyspark or dbt to get into BQ?

In [81]:
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile, is_zipfile
import pandas as pd
from prefect import flow, task
from google.cloud import storage
import os

In [82]:
# catch = pd.read_csv('../tmp/catch_19814.csv')
# size = pd.read_csv('../tmp/size_19826.csv')
# trip = pd.read_csv('../tmp/trip_19842.csv')

# catch_dtype = catch.dtypes.to_dict()
# size_dtype = size.dtypes.to_dict()
# trip_dtype = trip.dtypes.to_dict()

# print(size_dtype)

In [83]:
catch_dtype = {'common': str, 'strat_id': str, 'psu_id': int, 
                'id_code': int, 'MODE_FX': int, 'AREA_X': int, 
                'ST': int, 'SUB_REG': int, 'WAVE': int, 
                'year': int, 'month': int, 'kod': str, 
                'SP_CODE': float, 'CLAIM': float, 'RELEASE': float, 
                'HARVEST': float, 'CLAIM_UNADJ': int, 'HARVEST_UNADJ': int, 
                'RELEASE_UNADJ': int, 'tot_len_a': float, 'wgt_a': float, 
                'tot_len_b1': float, 'wgt_b1': float, 'tot_cat': float, 
                'wgt_ab1': float, 'tot_len': float, 'Landing': float, 
                'VAR_ID': str, 'ARX_METHOD': float, 'ALT_FLAG': int, 
                'fl_reg': int, 'wp_catch_precal': float, 'wp_int': float, 
                'wp_catch': float, 'date_published': str}

trip_dtype = {'prim2_common': str, 'prim1_common': str, 'strat_id': str, 
              'psu_id': int, 'ADD_HRS': float, 'AREA': int, 
              'AREA_X': int, 'BOAT_HRS': float, 'CATCH': int, 
              'CNTRBTRS': int, 'CNTY': int, 'CNTY_RES': int, 
              'COASTAL': str, 'FFDAYS2': int, 'FFDAYS12': int, 
              'FIRST': float, 'HRSF': float, 'ID_CODE': int, 
              'INTSITE': int, 'MODE_F': int, 'MODE_FX': int, 
              'NUM_TYP2': int, 'NUM_TYP3': int, 'NUM_TYP4': int, 
              'NUM_TYP6': float, 'ON_LIST': float, 'PARTY': float, 
              'REG_RES': float, 'ST': int, 'ST_RES': int, 
              'SUB_REG': int, 'TELEFON': int, 'WAVE': int, 
              'YEAR': int, 'ASG_CODE': int, 'month': int, 
              'kod': str, 'MODE_ASG': float, 'new_list': float, 
              'PRT_CODE': int, 'CELLTYPE': float, 'fshinsp_a': float, 
              'num_fish_a': float, 'fl_reg': int, 'ADD_PH': int, 
              'AREA_NC': float, 'COUNTY': str, 'DATE1': str, 
              'DIST': int, 'F_BY_P': int, 'GEAR': int, 
              'MODE2001': float, 'MUNI_RES': float, 'MUNI_TRP': float, 
              'PRIM1': float, 'PRIM2': float, 'PVT_RES': int, 
              'RIG': int, 'SEP_FISH': int, 'TIME': int, 
              'ZIP': float, 'AGE': float, 'wp_int': float, 
              'VAR_ID': str, 'ARX_METHOD': float, 'ALT_FLAG': int, 
              'LEADER': int, 'date_published': str}

size_dtype = {'AREA_X': int, 'ID_CODE': int, 'MODE_FX': int, 
              'ST': int, 'SUB_REG': int, 'WAVE': int, 
              'YEAR': int, 'month': int, 'kod': str, 
              'SP_CODE': float, 'LNGTH': float, 'WGT': float, 
              'lngth_imp': int, 'wgt_imp': int, 'strat_id': str, 
              'psu_id': int, 'common': str, 'wgt_unadj': float, 
              'wp_size': float, 'l_in_bin': float, 'l_cm_bin': float, 
              'VAR_ID': str, 'ARX_METHOD': float, 'ALT_FLAG': int, 
              'date_published': str}

In [84]:
@task()
def read_gcs_bucket(bucket) -> list:
    """Get list of zip folders in GCS bucket"""
    lsblob = list(bucket.list_blobs(prefix="zip"))  # get list of blobs in zip folder of bucket
    lsblob = [l.name for l in lsblob]  # only return the filename from the blobs
    print(lsblob)

    return lsblob


 `@task(name='my_unique_name', ...)`


In [85]:
@task()
def process_file(filename: str, file: str, bucket) -> pd.DataFrame:
    """Convert csv file to parquet file
    (csv) -> parquet"""
    tmp_dir = "../tmp"
    Path(tmp_dir).mkdir(parents=True, exist_ok=True)

    folder = filename.split("_")[0]
    trunc_fn = filename.split(".")[0]
    output_file = f"../tmp/{trunc_fn}.csv"
    outfile = open(output_file, "wb")
    outfile.write(file)
    outfile.close()

    file_extension = 'csv'
    if folder == "catch":
        dtype = catch_dtype
        blob = bucket.blob(f"catch/{trunc_fn}.{file_extension}")
    elif folder == "trip":
        dtype = trip_dtype
        blob = bucket.blob(f"trip/{trunc_fn}.{file_extension}")
    elif folder == "size":
        dtype = size_dtype
        blob = bucket.blob(f"size/{trunc_fn}.{file_extension}")

    # df = pd.read_csv(output_file
    #                  #, dtype=dtype
    #                  , low_memory=False
    #                  )
    # df.to_parquet(output_file.replace("csv", "parquet"))

    return trunc_fn, output_file, blob


 `@task(name='my_unique_name', ...)`


In [86]:
@task()
def write_gcs(filename: str, file, blob) -> None:
    """Write parquet file to GCS
    (Dataframe) -> None"""
    
    with open(file, "rb") as myfile:
        blob.upload_from_file(myfile)

    print(f'Csv file size is {os.stat(f"../tmp/{filename}.csv").st_size}')
    os.remove(f"../tmp/{filename}.csv")  # delete/remove outfile

    # print(f'Parquet file size is {os.stat(f"../tmp/{filename}.parquet").st_size}')
    # os.remove(f"../tmp/{filename}.parquet")  # delete/remove outfile


 `@task(name='my_unique_name', ...)`


In [87]:
# def unzip_blob_csv(source_path: str, bucket) -> list:
#     """Unzip folder"""
#     blob = bucket.blob(source_path)

#     zipbytes = BytesIO(blob.download_as_string())

#     if is_zipfile(zipbytes):
#         with ZipFile(zipbytes, "r") as myzip:
#             for contentfilename in myzip.namelist():
#                 folder = contentfilename.split("_")[0]
#                 contentfile = myzip.read(contentfilename)

#                 tmp_dir = "../tmp"
#                 Path(tmp_dir).mkdir(parents=True, exist_ok=True)

#                 trunc_fn = contentfilename.split("_")[1]
#                 output_file = f"../tmp/{trunc_fn}"
#                 outfile = open(output_file, "wb")
#                 outfile.write(contentfile)
#                 outfile.close()

#                 if folder == "catch":
#                     blob = bucket.blob(f"catch/{trunc_fn}")
#                 elif folder == "trip":
#                     blob = bucket.blob(f"trip/{trunc_fn}")
#                 elif folder == "size":
#                     blob = bucket.blob(f"size/{trunc_fn}")

#                 with open(output_file, "rb") as mycsv:
#                     blob.upload_from_file(mycsv)

#                 os.remove(f"../tmp/{trunc_fn}")  # delete/remove outfile


In [88]:
@task()
def unzip_blob(source_path: str, bucket) -> dict:
    """Unzip folder"""
    blob = bucket.blob(source_path)

    zipbytes = BytesIO(blob.download_as_string())

    lsfilename = []
    lsfile = []

    if is_zipfile(zipbytes):
        with ZipFile(zipbytes, "r") as myzip:
            for contentfilename in myzip.namelist():
                contentfile = myzip.read(contentfilename)

                lsfilename.append(contentfilename)
                lsfile.append(contentfile)
    
    dictfile = dict(zip(lsfilename, lsfile))
    return dictfile

In [89]:
@flow(log_prints=False)
def process_gcs_blob(blob: str = "ps_2021_csv.zip"
                     , bucket_name: str = "de_project_bucket") -> None:
    """Process blob in gcs bucket"""
    storage_client = storage.Client.from_service_account_json("../creds.json")
    # bucket_name = "de_project_bucket"  # parameterize this
    bucket = storage_client.get_bucket(bucket_name)

    # unzip_blob(blob, bucket)
    csvs = unzip_blob(f'zip/{blob}', bucket)
    for k, v in csvs.items():
        print(k)
        pf = process_file(k, v, bucket)
        filename = pf[0] #get file name without extension
        file = pf[1]
        blob = pf[2]
        write_gcs(filename, file, blob)

    # lsblob = read_gcs_bucket(bucket)

    # for blob in lsblob:
    #     csvs = unzip_blob(blob, bucket)
    #     for k, v in csvs.items():
    #         print(k)
    #         parquet = process_file(k, v, bucket)
    #         filename = parquet[0] #get file name without extension
    #         file = parquet[1]
    #         blob = parquet[2]
    #         write_gcs(filename, file, blob)


 `@flow(name='my_unique_name', ...)`


In [90]:
if __name__ == '__main__':
    blob = "ps_1981_1989_csv.zip"
    bucket_name = "de_project_bucket"
    process_gcs_blob(blob, bucket_name)  

#57min with convert to parquet @ 3.63GB


catch_19811.csv


Csv file size is 16


trip_19811.csv


Csv file size is 16


size_19811.csv


Csv file size is 96


catch_19812.csv


Csv file size is 214679


trip_19812.csv


Csv file size is 221693


size_19812.csv


Csv file size is 349282


catch_19813.csv


Csv file size is 1097659


trip_19813.csv


Csv file size is 1039962


size_19813.csv


Csv file size is 1830316


catch_19814.csv


Csv file size is 2343858


trip_19814.csv


Csv file size is 2361426


size_19814.csv


Csv file size is 3644168


catch_19815.csv


Csv file size is 1903945


trip_19815.csv


Csv file size is 1745956


size_19815.csv


Csv file size is 2770057


catch_19816.csv


Csv file size is 682233


trip_19816.csv


Csv file size is 644969


size_19816.csv


Csv file size is 832160


catch_19821.csv


Csv file size is 187148


trip_19821.csv


Csv file size is 189386


size_19821.csv


Csv file size is 179938


catch_19822.csv


Csv file size is 733905


trip_19822.csv


Csv file size is 691875


size_19822.csv


Csv file size is 1062130


catch_19823.csv


Csv file size is 2013613


trip_19823.csv


Csv file size is 1886209


size_19823.csv


Csv file size is 2799895


catch_19824.csv


Csv file size is 3073916


trip_19824.csv


Csv file size is 3006498


size_19824.csv


Csv file size is 4061843


catch_19825.csv


Csv file size is 2178233


trip_19825.csv


Csv file size is 2030075


size_19825.csv


Csv file size is 2974456


catch_19826.csv


Csv file size is 737476


trip_19826.csv


Csv file size is 711298


size_19826.csv


Csv file size is 1198775


catch_19831.csv


Csv file size is 223948


trip_19831.csv


Csv file size is 224706


size_19831.csv


Csv file size is 235947


catch_19832.csv


Csv file size is 999007


trip_19832.csv


Csv file size is 1083263


size_19832.csv


Csv file size is 1348965


catch_19833.csv


Csv file size is 2854150


trip_19833.csv


Csv file size is 2890102


size_19833.csv


Csv file size is 3696865


catch_19834.csv


Csv file size is 2932964


trip_19834.csv


Csv file size is 2859758


size_19834.csv


Csv file size is 4148337


catch_19835.csv


Csv file size is 1908299


trip_19835.csv


Csv file size is 1690831


size_19835.csv


Csv file size is 2812423


catch_19836.csv


Csv file size is 677453


trip_19836.csv


Csv file size is 680510


size_19836.csv


Csv file size is 895457


catch_19841.csv


Csv file size is 528842


trip_19841.csv


Csv file size is 545464


size_19841.csv


Csv file size is 541023


catch_19842.csv


Csv file size is 1002816


trip_19842.csv


Csv file size is 1085959


size_19842.csv


Csv file size is 1287145


catch_19843.csv


Csv file size is 2265231


trip_19843.csv


Csv file size is 2282909


size_19843.csv


Csv file size is 2945922


catch_19844.csv


Csv file size is 2217503


trip_19844.csv


Csv file size is 2093789


size_19844.csv


Csv file size is 3063883


catch_19845.csv


Csv file size is 1643422


trip_19845.csv


Csv file size is 1525414


size_19845.csv


Csv file size is 2226888


catch_19846.csv


Csv file size is 831724


trip_19846.csv


Csv file size is 827292


size_19846.csv


Csv file size is 1089326


catch_19851.csv


Csv file size is 368300


trip_19851.csv


Csv file size is 378817


size_19851.csv


Csv file size is 355965


catch_19852.csv


Csv file size is 1200663


trip_19852.csv


Csv file size is 1368006


size_19852.csv


Csv file size is 1464711


catch_19853.csv


Csv file size is 2455065


trip_19853.csv


Csv file size is 2499380


size_19853.csv


Csv file size is 3206498


catch_19854.csv


Csv file size is 3453919


trip_19854.csv


Csv file size is 3157039


size_19854.csv


Csv file size is 4925026


catch_19855.csv


Csv file size is 2345555


trip_19855.csv


Csv file size is 2086826


size_19855.csv


Csv file size is 3466281


catch_19856.csv


Csv file size is 1090284


trip_19856.csv


Csv file size is 1137123


size_19856.csv


Csv file size is 1255547


catch_19861.csv


Csv file size is 539798


trip_19861.csv


Csv file size is 586780


size_19861.csv


Csv file size is 641495


catch_19862.csv


Csv file size is 1426546


trip_19862.csv


Csv file size is 1600398


size_19862.csv


Csv file size is 1845202


catch_19863.csv


Csv file size is 3331681


trip_19863.csv


Csv file size is 3127699


size_19863.csv


Csv file size is 4849439


catch_19864.csv


Csv file size is 4355519


trip_19864.csv


Csv file size is 3666292


size_19864.csv


Csv file size is 6388617


catch_19865.csv


Csv file size is 3717008


trip_19865.csv


Csv file size is 2705702


size_19865.csv


Csv file size is 6174002


catch_19866.csv


Csv file size is 1622702


trip_19866.csv


Csv file size is 1536624


size_19866.csv


Csv file size is 2357277


catch_19871.csv


Csv file size is 639608


trip_19871.csv


Csv file size is 684041


size_19871.csv


Csv file size is 547258


catch_19872.csv


Csv file size is 1490736


trip_19872.csv


Csv file size is 1498805


size_19872.csv


Csv file size is 1884727


catch_19873.csv


Csv file size is 2888607


trip_19873.csv


Csv file size is 2816914


size_19873.csv


Csv file size is 3946082


catch_19874.csv


Csv file size is 4214399


trip_19874.csv


Csv file size is 4000342


size_19874.csv


Csv file size is 5424936


catch_19875.csv


Csv file size is 3261515


trip_19875.csv


Csv file size is 2959222


size_19875.csv


Csv file size is 4754343


catch_19876.csv


Csv file size is 1325334


trip_19876.csv


Csv file size is 1346543


size_19876.csv


Csv file size is 1656861


catch_19881.csv


Csv file size is 471022


trip_19881.csv


Csv file size is 581693


size_19881.csv


Csv file size is 384420


catch_19882.csv


Csv file size is 1157847


trip_19882.csv


Csv file size is 1322507


size_19882.csv


Csv file size is 1344425


catch_19883.csv


Csv file size is 3193693


trip_19883.csv


Csv file size is 3348895


size_19883.csv


Csv file size is 3944288


catch_19884.csv


Csv file size is 4968795


trip_19884.csv


Csv file size is 4743092


size_19884.csv


Csv file size is 5758286


catch_19885.csv


Csv file size is 3994621


trip_19885.csv


Csv file size is 3540957


size_19885.csv


Csv file size is 4673139


catch_19886.csv


Csv file size is 1953470


trip_19886.csv


Csv file size is 1797875


size_19886.csv


Csv file size is 2509085


catch_19891.csv


Csv file size is 744980


trip_19891.csv


Csv file size is 775687


size_19891.csv


Csv file size is 759322


catch_19892.csv


Csv file size is 1479496


trip_19892.csv


Csv file size is 1747974


size_19892.csv


Csv file size is 1699396


catch_19893.csv


Csv file size is 4051412


trip_19893.csv


Csv file size is 4261318


size_19893.csv


Csv file size is 5228423


catch_19894.csv


Csv file size is 6249713


trip_19894.csv


Csv file size is 6292296


size_19894.csv


Csv file size is 7643949


catch_19895.csv


Csv file size is 4623031


trip_19895.csv


Csv file size is 4482348


size_19895.csv


Csv file size is 6220972


catch_19896.csv


Csv file size is 1497254


trip_19896.csv


Csv file size is 1548794


size_19896.csv


Csv file size is 1777045
