Functions needed to bulk insert rows into Carto

In [None]:
def sql_api(url, sql, key):
    """ Execute sql request over Carto SQL API """
    params = {
        'api_key' : key,
        'q'       : sql
    }
    r = req.get(url, params=params)
    return(r)

def dump_row_contents(row, cols_and_types):
    """ Format data from a dataframe for insert statements into a Carto table """
    dump = "("
    for ix in row.index:
        if cols_and_types[ix] in ["date", "varchar"]:
            dump += "'" + str(row[ix]) + "',"
        else:
            dump += str(row[ix]) + ","
    dump = dump[:-1]+")"
    return(dump)

def update_in_batches(data_df, batch_size, target_table_name, cols_and_types, cols_with_apostrophes=None):
    """ 
    Send new rows for Carto in smaller batch sizes.
    A batch_size of 20 seems to work for the location data. 
    """
    
    # Define column names
    columns = str(tuple(data_df.columns)).replace("'","")
    
    # Determine number of batches in which to send data
    num_batches = int(data_df.shape[0] / batch_size)

    for batch in range(num_batches+1):
        # Select sub-dataframe
        sub_df = data_df.iloc[batch*batch_size:batch*batch_size+batch_size]
        
        # Replace apostrophes from varchar columns with &#8217
        sub_df = toggle_apostrophes(sub_df, cols_with_apostrophes, remove=True)
        
        # Create Insert SQL statement
        values = ", ".join(list(sub_df.apply(lambda row: dump_row_contents(row, cols_and_types), axis=1)))
        insert_value_sql = """
        INSERT INTO {table_name} {columns} VALUES {values}
        """.format(table_name=table_name, columns=columns, values=values)

        res = sql_api(carto_url, insert_value_sql, carto_api_token)

        # Help with trouble shooting
        # Display response error, the data that created the error, and break the cycle
        if "error" in res.text:
            print(res.text)
            print(sub_df)
            break

        print("Completed up until index:", batch*batch_size+batch_size)
        
def keep_geolocated(df):
    """Throw away points that do not have a latitude and longitude defined"""
    keep_geotagged = pd.notnull(df["latitude"]) & pd.notnull(df["longitude"]) 
    df = df.loc[keep_geotagged]
    return(df)

def fix_precision_of_floats(df, float_columns, precision):
    """Use this to address problem of comparing numpy floats with rounding errors"""
    df = df.copy()
    for col in float_columns:
        df[col] = np.around(df[col],precision)
    return(df)

def toggle_apostrophes(df, cols_with_apostrophes, remove=True):
    """
    Will switch between &#8217 and ' representation of an apostrophe
    Provides a reversible function to accomplish this
    
    TO DO: Address how this affects our data storage... &#8217 will be in Carto table
    
    """
    # Copy df
    df = df.copy()
    
    # Initialize array to avoid "NoneType not iterable" error
    if not cols_with_apostrophes:
        cols_with_apostrophes = []
    
    # Loop over all columns and either remove or replace apostrophes
    for col in cols_with_apostrophes:
        if remove:
            df[col] = df[col].apply(lambda row: str(row).replace("'", "&#8217"))
        else:
            df[col] = df[col].apply(lambda row: str(row).replace("&#8217", "'"))
    return(df)

def update_table_without_duplicates(data_df, target_table_name, cols_and_types, float_cols=["latitude", "longitude"], precision=8, cols_with_apostrophes=None, consider_partial_history=None):
    """ 
    Determines whether there are new locations to add to the table.
    Sends an SQL statement and returns the result of that operation to stdout.
    
    look_back_length parameter allows for de-duping with a table while only considering a limited
    history of the recent record
    """
    # column names to include
    column_names = list(cols_and_types.keys())
    
    # Read in existing table, set look_back if desired
    # Particularly for updating OpenAQ history, as opposed to locations list (look_back_length=None)
    # NOTE: this is confusing parameter use. Consider a better formulation.
    
    if not consider_partial_history:
        # Update against entire table
        select_all_sql = """
        SELECT * FROM {table_name}
        """.format(table_name=target_table_name)
        res = sql_api(carto_url, select_all_sql, carto_api_token)
    else:
        
        ##
        ### TO DO: ensure that this works!
        ##
        
        # Update against partial history
        select_all_sql = """
        SELECT * FROM {table_name} WHERE lastUpdated < {length_of_partial_history}
        """.format(table_name=target_table_name, length_of_partial_history=consider_partial_history)
        res = sql_api(carto_url, select_all_sql, carto_api_token)
        
    target_table = pd.DataFrame(res.json()["rows"], columns=column_names)
    
    # Fix precision on float columns
    target_table = fix_precision_of_floats(target_table, float_cols, 8)
    
    # Fix apostrophe change
    target_table = toggle_apostrophes(target_table, cols_with_apostrophes, remove=False)
    
    # Determine unique observations in data_df (de-dupe in the new observations)
    # http://pandas.pydata.org/pandas-docs/version/0.17/generated/pandas.DataFrame.drop_duplicates.html
    obs = data_df[column_names] #.set_index(index_cols)
    obs = obs.drop_duplicates(keep="first")
    obs = keep_geolocated(obs)
    obs = fix_precision_of_floats(obs, float_cols, 8)
    
    # De-dupe between existing table and new observations
    # https://stackoverflow.com/questions/29464234/compare-python-pandas-dataframes-for-matching-rows
    shared = pd.merge(target_table, obs, on=column_names, how="inner")
    shared["key"] = "x"
    temp_df = pd.merge(obs, shared, on=column_names, how="left")
    new_obs = temp_df[temp_df["key"].isnull()].drop("key", axis=1)

    print("Number of new observations added to", target_table_name + ":", new_obs.shape[0])
    
    # Add genuinely new observations to the existing table
    update_in_batches(new_obs, 20, target_table_name, cols_and_types, cols_with_apostrophes)