Extract Process #1

This process retrieves data from a URL, downloads that data, and then uploads
the data to a Google Cloud Storage bucket. The process expects the following
environment variables to be set:

* GOOGLE_APPLICATION_CREDENTIALS
* PIPELINE_DATA_BUCKET

In [None]:
# Load process environment variables from file named ".env". Environment vars
# will be available in the os.environ dictionary.
from dotenv import load_dotenv
load_dotenv()

# Import additional required packages
import datetime as dt
import os
import requests
from google.cloud import storage

focus_path = r"https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query?where=1%20=%201&City%20=%20'Walnut%20Creek'&Current_%20=%20'TRUE'&outFields=APN,Current_,Address,City&returnGeometry=false&f=json"



try:
    # Retrieve data from URL
    print('Downloading the parcels data...')
    response = requests.get(focus_path)
    print(response)

    response.raise_for_status()
except requests.exceptions.HTTPError as err:
    print(err)

if response.status_code == 200:
    import json
    response_json = json.loads(response.content.decode('utf8'))

In [None]:
featureservice_path = "https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/query?layerDefs=&geometry=&geometryType=esriGeometryEnvelope&inSR=&spatialRel=esriSpatialRelIntersects&outSR=&datumTransformation=&applyVCSProjection=false&returnGeometry=true&maxAllowableOffset=&geometryPrecision=&returnIdsOnly=false&returnCountOnly=true&sqlFormat=none&f=json"
featureservice_path = "https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query"
response_count = requests.get(featureservice_path)

In [None]:
def get_featureservice_count(
    feature_path = "https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query",
    where_list = [
        #"1=1", 
        "City = 'Walnut Creek'", "Current_ = 'TRUE'"
]):
    where_sql = ' AND '.join(where_list)

    params_dict = {
        "where":where_sql,
        #"returnGeometry":"false",
        #"outFields":"APN,Current_,Address,City",
        #"geometryType":"esriGeometryPolygon",
        "returnCountOnly":"true",
        "outFields":"APN",
        #resultOffset
        'f': 'json'
    }

    response = requests.get(
        featureservice_path, 
        #headers=header_dict,
        params=params_dict)

    if response_count.status_code == 200:
        import json
        #response_count.content
        #response_json = json.loads(response_count.content.decode('utf8'))
        feature_size_count = json.loads(response.content.decode('utf8'))['count']
        return(feature_size_count)
    else:
        raise

def get_arcgis_json_df(
    rest_api_path,
    offset = 0,
    where_list = [
        #"1=1", 
        "City = 'Walnut Creek'", "Current_ = 'TRUE'"],
    params_dict = {
        "returnGeometry":"false",
        "outFields":"APN,Current_,Address,City",
        "geometryType":"esriGeometryPolygon",
        'f': 'json'
    }):
    
    where_sql = ' AND '.join(where_list)

    params_dict['where'] = where_sql
    params_dict['resultOffset'] = offset

    try:
        # Retrieve data from URL
        #print('Downloading the parcels data...')
        response = requests.get(
            rest_api_path,
            params = params_dict
            )
        #print(response)

        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print(err)



    if response.status_code == 200:
        import json
        response_json = json.loads(response.content.decode('utf8'))

        import pandas as pd

        # cols = [col['name'] for col in response_json['fields']]
        extract_df = pd.DataFrame(data=[
            row['attributes'] for row in response_json['features']
            ])
        return(extract_df)
    else:
        raise

featureservice_path = r"https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query"

feature_count = get_featureservice_count(featureservice_path)
print(feature_count)
max_page_size = 2000
pages = int((feature_count/max_page_size)+1 / 1)

featureservice_df = pd.DataFrame()

for current_page in range(0,pages):
    current_offset_count = current_page*max_page_size

    current_page_df = get_arcgis_json_df(
        featureservice_path,
        offset = current_offset_count
        )
    
    featureservice_df = pd.concat([
        featureservice_df, current_page_df
    ])
len(featureservice_df['APN'].unique())

In [None]:
def get_featureservice_count(
    featureservice_path = "https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query",
    where_list = [
        "1=1", "City = 'Walnut Creek'", "Current_ = 'TRUE'"]):
    params_dict = {}
    where_sql = ' AND '.join(where_list)
    params_dict['where'] = where_sql
    params_dict["returnCountOnly"] = "true"
    params_dict['f'] = 'json'

    response = requests.get(
        featureservice_path, 
        #headers=header_dict,
        params=params_dict)

    feature_size_count = json.loads(response.content.decode('utf8'))['count']

    return(feature_size_count)

def get_featureservice_df(
    rest_api_path,
    offset = 0,
    where_list = [
        "1=1", 
        "City = 'Walnut Creek'", "Current_ = 'TRUE'"],
    params_dict = {
        "returnGeometry":"false",
        "outFields":"APN,Current_,Address,City",
        "geometryType":"esriGeometryPolygon"
    }):
    
    where_sql = ' AND '.join(where_list)

    params_dict['where'] = where_sql
    params_dict['resultOffset'] = offset
    params_dict['f'] = 'json'

    try:
        # Retrieve data from URL
        #print('Downloading the parcels data...')
        response = requests.get(
            rest_api_path,
            params = params_dict
            )
        #print(response)

        response.raise_for_status()
    except requests.exceptions.HTTPError as err:
        print(err)



    if response.status_code == 200:
        import json
        response_json = json.loads(response.content.decode('utf8'))

        import pandas as pd

        # cols = [col['name'] for col in response_json['fields']]
        extract_df = pd.DataFrame(data=[
            row['attributes'] for row in response_json['features']
            ])
        return(extract_df)
    else:
        raise


In [19]:
import os
os.chdir(r'C:\Users\nelms\Documents\Penn\MUSA-509\MUSA509_Final_ParcelUpdate\airflow\dags\data_pipeline')

import get_arcgis_json_df as arcgis_df

featureservice_path = "https://services2.arcgis.com/AhHMUmDoudKVXiUl/ArcGIS/rest/services/Master_Parcels/FeatureServer/0/query"
where_list = ["1=1", "City = 'Walnut Creek'", "Current_ = 'TRUE'"]
params_dict = {
    "returnGeometry":"false",
    "outFields":"APN,Current_,Address,City",
    "geometryType":"esriGeometryPolygon"}

focus_df = arcgis_df.get_featureservice_all(featureservice_path, where_list=where_list, params_dict=params_dict)

28953


In [20]:
from google.cloud import bigquery
from google.cloud import storage
import pandas as pd
import geopandas as gpd
import pathlib
import requests
import tempfile

def local_file_to_gcs(local_file_name, gcs_bucket_name, gcs_blob_name, content_type=None):
    """
    This function uploads a file from the local machine to Google Cloud Storage.
    """
    print(f'Saving to GCS file gs://{gcs_bucket_name}/{gcs_blob_name} '
          f'from local file {local_file_name}...')

    storage_robot = storage.Client()
    bucket = storage_robot.bucket(gcs_bucket_name)
    blob = bucket.blob(gcs_blob_name)
    blob.upload_from_filename(local_file_name, content_type=content_type)

OSError: could not find or load spatialindex_c-64.dll