In [1]:
from configparser import ConfigParser
import datetime
import logging
from pathlib import Path
import pkgutil
import sys
from typing import Optional, Union
from zipfile import ZipFile

import arcpy

In [2]:
dir_prj = Path.cwd().parent

in_dir = dir_prj / 'data' / 'ad_hoc' / 'export20231003'
in_pqt = in_dir / 'parquet'

out_dir = dir_prj / 'data' / 'processed' / 'dan20231003'
output_fgdb = out_dir / 'foursquare.gdb'
out_fc = output_fgdb / 'places'
zip_pth = out_dir / 'foursquare_geoenrichment.zip'

src_dir = dir_prj / 'src'

sys.path.insert(0, str(src_dir))

from arcpy_parquet import parquet_to_feature_class 

if not out_dir.exists():
    out_dir.mkdir()

assert in_dir.exists()
assert out_dir.exists()

In [3]:
logger = logging.getLogger('dan20231003')

timestamp_str = datetime.datetime.today().strftime('%Y%m%d')
fh = logging.FileHandler(str(out_dir / f"dan20231003_{timestamp_str}.log"))
fh.setFormatter(logging.Formatter('%(asctime)s | %(name)s | %(levelname)s | %(message)s'))
logger.addHandler(fh)

logger.setLevel(logging.INFO)
logger.propagate = True

In [4]:
# flush the geodatabase to avoid any corruption issues
if arcpy.Exists(str(output_fgdb)):
    arcpy.management.Delete(str(output_fgdb))

# create the output file geodatabase
logger.info(f'Starting creation of File Geodatabase - {output_fgdb}')
arcpy.management.CreateFileGDB(str(output_fgdb.parent), str(output_fgdb.stem))
logger.info(f'Finished creation of File Geodatabase.')

In [5]:
def get_schema_csv(schema_dir: Union[Path, str]) -> Path:
    """Helper function to retrieve the csv for the schema when saved as a single part file from Spark."""
    # ensure we are working with a Path object
    if isinstance(schema_dir, str):
        schema_dir = Path(schema_dir)

    # if working in a directory
    if schema_dir.is_dir():

        # get part csv file
        prt_lst = [fl for fl in schema_dir.glob('part-*.csv')]

        # ensure there even is a part file to work with
        if len(prt_lst) == 0:
            raise ValueError('Cannot locate a part*.csv file in the directory tree.')
        else:
            schema_csv = prt_lst[0]

    # if just the file was passed
    elif schema_dir.suffix == '.csv':
        schema_csv = schema_dir

    # pitch a fit if cannot figure out what to  do
    else:
        raise ValueError('Cannot locate a schema *.csv file.')
    
    return schema_csv

schema_csv = get_schema_csv(in_dir / 'schema')

schema_csv

WindowsPath('C:/projects/foursquare-data-loading/data/ad_hoc/export20231003/schema/part-00000-9e7cf37a-7f77-4e1d-a4ee-a1e217b67942-c000.csv')

In [6]:
# run the conversion
logger.info('Starting parquet data import.')

# convert to feature class
parquet_to_feature_class(
    parquet_path=in_pqt,
    output_feature_class=out_fc,
    schema_file=schema_csv,
    spatial_reference=3857,
    build_spatial_index=True,
    logger=logger
)

WindowsPath('C:/projects/foursquare-data-loading/data/processed/dan20231003/foursquare.gdb/places')

In [7]:
# compress the file geodatabase
with ZipFile(zip_pth, mode='w', compresslevel=9) as zipper:

    # iterate the files comprising the file geodatabase and add to the archive
    for gdb_file in out_dir.glob('**/*.gdb/**/*'):
        target_pth = str(gdb_file.relative_to(out_dir))
        zipper.write(gdb_file, target_pth)

logger.info(f'Successfully created archive {zip_pth}.')