<a href="https://us.pg.com/"><img src = "https://bloximages.newyork1.vip.townnews.com/insideradio.com/content/tncms/assets/v3/editorial/2/26/2261ca00-9e63-11e5-88e8-9f74072d2782/5668084fb39ce.image.jpg" width = 400, align = "center"></a>

# GEONAS PYTHON LIBRARY - StoreMaster Prep

## Overview

Verifying the accuracy and remedying store latlong (latitude/longitude) is the first step in Neighbour Analytics. This notebook is a step-by-step guide to implement a workflow to auto-detect issues in store latlong.

In the process of the checks, the workflow will conduct checks and auto-correct some of latlong data, then it will also do a spatial join of shapefiles and storemaster with latlong, produce an output that includes the Province/City/Town that a store falls into.

## Content

<table>
<tr>
    <th>Function</th>
    <th>Description</th>
    <th>Name</th>
    <th>Parameters</th>
</tr>
<tr>
    <td>Read shapes from file</td>
    <td>Reads the shapefile stored in the FileStore into a GeoDataFrame</td>
    <td>shapefile_to_df</td>
    <td><ul>
        <li>shpfile_input_loc (string): path to the shapefile</li>
    </ul></td>
</tr>
<tr>
    <td>Convert shapefile to CSV</td>
    <td>Reads the shapefile and saves it as CSV with WKT-encoded geomatry</td>
    <td>shapefile_to_csv</td>
    <td><ul>
        <li>shpfile_input_loc (string): path to the input shapefile</li>
        <li>csvfile_output_loc (string): path to the output CSV file</li>
    </ul></td>
</tr>
<tr>
    <td>Read shapes from table</td>
    <td>Reads shape from the database table into a Pandas DataFrame</td>
    <td>shapetable_to_df</td>
    <td><ul>
        <li>table_name (string): name of the table containing shapes data, may be prefixed with the database name</li>
        <li>crs (string): Coordinate Reference System for spatial projection of coordinates. Default 'EPSG:4326' for WGS-84</li>
        <li>geometry_col (string): Name of the column in `table_name` table containing polygon encoded in WKT format</li>
    </ul></td>
</tr>
<tr>
    <td>Correct lat/lon</td>
    <td>
        Takes pandas.DataFrame `df` (containing columns `lat_col`, `lon_col`) as input and appends columns `lat_corr_col`, `lon_corr_col` with corrected
        coordinates and a `remark_col` with remark on this correction containing:<br/>
        <ul>
            <li>'String': one of `lat_col`, `lon_col` contains string, that could not be parsed to a float</li>
            <li>'Zeros': both `lat_col` and `lon_col` are zeros</li>
            <li>'Swap lat/lon': values for latitude and longitide were swapped</li>
            <li>'Min/max boundaries': latitude or longitude is a number, but could not be interpreted as a coordinate.</li>
            <li>'Duplicated': There is at least one record in the dataframe with exactly the same latitude and longitude (after swap, if applies).</li>
        </ul>
    </td>
    <td>correct_lat_lon</td>
    <td><ul>
        <li>df (pandas.DataFrame):</li><li>lat_col (str): Latitude column name in `df`</li>
        <li>lon_col (str): Longitude column name in `df`</li>
        <li>lat_corr_col (str): Column name for corrected latitude</li>
        <li>lon_corr_col (str): Column name for corrected longitude</li>
        <li>remark_col (str): Column name for remarks on correction steps</li>
    </ul></td>
</tr>
<tr>
    <td>Geospatial join</td>
    <td>Joins every store from `stores` to the nearest shape from `shapes`</td>
    <td>geospatial_join</td>
    <td><ul>
        <li> shapes (geopandas.GeoDataFrame): GeoDataFrame with column `shapes_geom_col`</li>
        <li>stores_raw (geopandas.GeoDataFrame): GeoDataFrame with store locations in [`lat_col`, `lon_col`] columns</li>
        <li>columns_to_be_joined (str[]): List of columns from `shapes` dataframe to be joined to `stores_raw`</li>
        <li>lat_col (str): Latitude column in `stores_raw` dataframe</li>
        <li>lon_col (str): Longitude column in `stores_raw` dataframe</li>
        <li>lat_corr_col (str): Corrected latitude column</li>
        <li>lon_corr_col (str): Corrected longitude column</li>
        <li>remark_col (str): Column with remarks on transformation, applied to `lat_col`, `lon_col`</li>
        <li>store_geom_col (str): Column with store location to be constructed from `lat_col`, `lon_col`</li>
        <li>shapes_geom_col (str): Column with shape geometry information</li>
        <li>shape_idx_cols (iterable): Set of columns to identify shape</li>
        <li>store_idx_col (str): Column to identify store</li>
    </ul></td>
</tr>
</table>

## Dependencies

Supported way to install geopandas along with dependencies on the cluster is using Conda. Conda is default package manager in Databricks Runtime ML, therefore it is strongly advisable to use [Databricks Runtime ML](https://docs.databricks.com/runtime/mlruntime.html) in the cluster.

If geopandas has been already installed with pip, uninstall it and then install with Conda. Should you not uninstall it, following install command will very likely hang.

In [6]:
%sh
conda install -c conda-forge geopandas==0.7.0 

conda list | grep -i shapely
conda list | grep -i geopandas
conda list | grep -i rtree
conda list | grep -i pyproj

## Imports

Import libraries and verify versions of packages

In [8]:
import geopandas as gpd
import pandas as pd
import shapely

import logging
logging.basicConfig(level=logging.INFO)
logging.getLogger("py4j").setLevel(logging.ERROR)

logging.info("geopandas version: {}".format(gpd.__version__))
logging.info("shapely version: {}".format(shapely.__version__))

## Functions

### shapefile_to_df

In [10]:
def shapefile_to_df(shpfile_input_loc):
    '''
    Loads shapefile to GeoDataFrame
    '''   
    gdf = gpd.read_file(shpfile_input_loc, encoding = 'utf-8')
    print('Loaded {0} shapes from {1}'.format(len(gdf), shpfile_input_loc))
   
    return gdf

### shapefile_to_csv

In [1]:
def shapefile_to_csv(shpfile_input_loc, csvfile_output_loc):
    '''
    Converts shapefile to CSV file. Use it if you want to load shape data to the table from CSV.
    '''
    gdf = gpd.read_file(shpfile_input_loc, encoding = 'utf-8')
    print('Loaded {0} shapes from {1}'.format(len(gdf), shpfile_input_loc))
    df = pd.DataFrame(gdf)
    print('Saving shape data to {0}...'.format(csvfile_output_loc))
    df.to_csv(csvfile_output_loc)
    print('Done')

### shapetable_to_df

In [14]:
%python
def shapetable_to_df(table_name, crs='EPSG:4326', geometry_col='geometry'):
    """
        Reads data from a `table_name` table containing WKT encoded `geometry` column and converts WKT to shapely geometry type
        
        Parameters
        ----------
        table_name: str
        crs: str
            Coordinates projection of geodesic parameters
        geometry_col: str
            Name of WKT geometry column in the input table and in the output GeoDataFrame
        ----------
        Returns: GeoDataFrame with columns as in the source table
        """

    # Reading table containing WKT-encoded geometry column
    wkt_df = spark.sql("SELECT * FROM {}".format(table_name)).toPandas()

    # Converting strings containing serialized geometry objects to shapely.geometry
    geometry = wkt_df[geometry_col].map(shapely.wkt.loads)

    # Dropping WKT geometry column from dataframe
    wkt_df = wkt_df.drop(geometry_col, axis=1)

    # Creating GeoDataFrame with geometry column converted from WKT string
    shapes_df = gpd.GeoDataFrame(wkt_df, crs=crs, geometry=geometry)

    return shapes_df

### correct_lat_lon

In [16]:
def correct_lat_lon(
    df,
    lat_col='lat',
    lon_col='lon',
    lat_corr_col='lat_corr',
    lon_corr_col='lon_corr',
    remark_col='remark',
):
    """
        This function takes pandas.DataFrame `df` (containing columns `lat_col`, `lon_col`) as input and appends `lat_corr_col`, `lon_corr_col`, `remark_col`
        with corrected coordinates and a remark on this correction:
            - 'String': one of `lat_col`, `lon_col` contains string, that could not be parsed to a float;
            - 'Zeros': both `lat_col` and `lon_col` are zeros;
            - 'Swap lat/lon': values for latitude and longitide were swapped;
            - 'Min/max boundaries': latitude or longitude is a number, but could not be interpreted as a coordinate.

        Parameters
        ----------
        df: pandas.DataFrame
        lat_col: str
            Used to determine latitude column in `df`.
        lon_col: str
            Used to determine longitude column in `df`.
        lat_corr_col: str
            Column name for corrected latitude.
        lon_corr_col: str
            Column name for corrected longitude.
        remark_col: str
            Column name for remarks on correction steps.
        """
    def _correct_lat_lon(lat, lon, remarks=[]):
        """
            Helper function for `correct_lat_lon'. Corrects single `(lat, lon)` pair.

            Parameters
            ----------
            lat: obj
                Latitude value.
            lon: obj
                Longitude value
            remarks: list
                List of remark strings.
            """
        if pd.isnull(lat) or pd.isnull(lon):
            return None, None, remarks + ['Missing']
        if isinstance(lat, str) or isinstance(lon, str):
            try:
                lat = float(lat)
                lon = float(lon)
            except Exception as e:
                logging.info(
                    'Could not parse from string lat={}, lon={}'.format(lat, lon))
                return None, None, remarks + ['String']
        if (lat == 0) and (lon == 0):
            logging.info('Remove zeros: lat={}, lon={}'.format(lat, lon))
            return None, None, remarks + ['Zeros']
        # lat +90 to -90 long +180 to -180
        if not ((-90 <= lat <= 90) and (-180 <= lon <= 180)):
            if ((-90 <= lon <= 90) and (-180 <= lat <= 180)):
                logging.info('Swap lat<->lon lat={}, lon={}'.format(lat, lon))
                lat, lon = lon, lat
                return lat, lon, remarks + ['Swap lat/lon']
            else:
                logging.info(
                    'Lat, lon exceed min-max boundaries lat={}, lon={}'.format(lon, lat))
                return None, None, remarks + ['Min/max boundaries']
        return lat, lon, remarks

    #  correct
    df_corr = (df
               .apply(lambda row: _correct_lat_lon(row[lat_col], row[lon_col]), axis=1)
               .apply(pd.Series, index=[lat_corr_col, lon_corr_col, remark_col])
               )

    dupl_mask = df_corr[[lat_corr_col, lon_corr_col]].dropna().duplicated(
        keep=False).reindex(df_corr.index).fillna(False)
    logging.info('Found {} duplicated lat-lon pairs'.format(dupl_mask.sum()))
    df_corr.loc[dupl_mask, remark_col] = df_corr.loc[dupl_mask, remark_col] + pd.Series([['Duplicated'] for i in dupl_mask[dupl_mask].index], index=dupl_mask[dupl_mask].index)
    df = df.join(df_corr, how='left')
    return df

### geospatial_join

In [18]:
def geospatial_join(shapes,
                    stores_raw,
                    columns_to_be_joined=None,
                    lat_col='lattitude',
                    lon_col='longitude',
                    lat_corr_col='lattitude_corr',
                    lon_corr_col='longitude_corr',
                    remark_col='remark',
                    store_geom_col='point_geometry',
                    shapes_geom_col='geometry',
                    region_idx_cols=['ID_3'],
                    store_idx_col='store_code_h'
                    ):
    """
    Performs 2 rounds of geospatial join on `stores_raw` and `shapes` dataframes.
    First, dataframes are joined by exact intersection of `shapes` polygon and `stores_raw` point.
    For those not matched  `_sjoin_min_distance` is applied.

    Parameters:
    -----------
    shapes:
        GeoDataFrame with column `shapes_geom_col`.
    stores_raw:
        GeoDataFrame with store locations in [`lat_col`, `lon_col`] columns.
    columns_to_be_joined:
        List of columns from `shapes` dataframe to be joined to `stores_raw`.
    lat_col:
        Latitude column.
    lon_col:
        Longitude column.
    lat_corr_col:
        Corrected latitude column.
    lon_corr_col:
        Corrected longitude column.
    remark_col:
        Column with remarks on transformation, applied to `lat_col`, `lon_col`.
    store_geom_col:
        Column with store location to be constructed from `lat_col`, `lon_col`.
    shapes_geom_col:
        Column with shape geometry information.
    shape_idx_cols: iterable
        Set of columns to identify shape.
    store_idx_col: str
        Column to identify store.
    """
    shapes_cols = region_idx_cols + [shapes_geom_col]
    store_cols = [store_idx_col, store_geom_col]

    if columns_to_be_joined is None:
        columns_to_be_joined = list(shapes.columns)

    # correct lat/lon
    stores = stores_raw.pipe(correct_lat_lon,
                             lat_col=lat_col,
                             lon_col=lon_col,
                             lat_corr_col=lat_corr_col,
                             lon_corr_col=lon_corr_col,
                             remark_col='remark')
    logging.info("correct latlong success")
    # convert pandas to geopandas
    mask = stores[[lat_corr_col, lon_corr_col]].notnull().any(axis=1)
    stores[store_geom_col] = gpd.points_from_xy(
        stores[lon_corr_col], stores[lat_corr_col])
    stores = gpd.GeoDataFrame(stores).set_geometry(store_geom_col)
    stores.crs = shapes.crs
    logging.info("definition success")
    # join shapes with store locations
    joined = gpd.sjoin(shapes, stores,
                       how="right", op='intersects').set_index(store_idx_col)
    logging.info("gpd join success")
    # join with min distance
    logging.info(region_idx_cols)

    candidate_stores = joined.index[joined[region_idx_cols].isnull().any(
        axis=1)].unique()

    logging.info("joined index success")

    mask = stores[store_idx_col].isin(candidate_stores)

    try:
        res = _sjoin_min_distance(
            shapes[shapes_cols],
            stores.loc[mask, store_cols],
            store_geom_col=store_geom_col,
            shapes_geom_col=shapes_geom_col,
            shape_idx_cols=region_idx_cols,
            store_idx_col=store_idx_col
        )
        logging.info("min distance success")
        res = res.set_index(store_idx_col)
        joined.loc[res.index, region_idx_cols] = res[region_idx_cols]
        joined.loc[res.index, remark_col] += ['Min distance']
    except:
        logging.info("No  min distance")
    # join store counts for each shape
    store_counts_ser = (joined
                        .groupby(region_idx_cols).size())
    joined = gpd.GeoDataFrame(pd.merge(joined, store_counts_ser.to_frame('store_count'), how='left',
                                       left_on=region_idx_cols, right_index=True))

    # Resetting index on joined dataframe so it becames a columns like in original stores dataframe
    # Merging joined dataframe containing extended stores data with shapes dataframeas follows:
    # - from joined take only columns which were in stores at the beginning + shapes' key columns
    # - from shapes take only columns to be joined + key columns
    joined = pd.merge(joined.reset_index()[list(stores.columns) + region_idx_cols], shapes[set(columns_to_be_joined + region_idx_cols)], on=region_idx_cols, how='left')
    # Converting remark column from list to string
    joined.loc[:, remark_col] = joined.loc[:, remark_col].str.join(', ')
    # Sorting output columns: stores columns first, then columns to be joined from shapes
    # Filtering out shapes index columns if they are not supposed to be included in the output
    joined = joined[list(stores.columns) + columns_to_be_joined]

    return joined

## Usage
### Read in Shapefile
You can read shapefile directly into the GeoDataframe

In [20]:
# File in databricks filestore
SHAPES_FILE = "/FileStore/tables/gadm36_JPN_2.shp"

df = shapefile_to_df(SHAPES_FILE)
df.head()

### Convert shapefile to CSV

If you cannot load shapefile directly from the FileStore please follow this steps:
- convert shapefile locally using shapefile_to_csv function
- load CSV file to the FileStore
- create a table containing the shape data from CSV

In [2]:
import geopandas as gpd
import pandas as pd

SHAPES_FILE = r"C:\gadm36_JPN_shp\gadm36_JPN_2.shp"
CSV_FILE = r"C:\gadm36_JPN_shp\gadm36_JPN_2.csv"

shapefile_to_csv(SHAPES_FILE, CSV_FILE)

Loaded 1811 shapes from C:\gadm36_JPN_shp\gadm36_JPN_2.shp
Saving shape data to C:\gadm36_JPN_shp\gadm36_JPN_2.csv...
Done


### Read in shapes data from the table

You can read shapes data from the table instead of the shapefile. Table has to have a column containing Polygons in WKT format.

In [24]:
%python
SHAPES_TABLE = "default.gadm36_jpn_2"

shapes = shapetable_to_df(SHAPES_TABLE)
shapes.head()

Unnamed: 0,ID,GID_0,NAME_0,GID_1,NAME_1,NL_NAME_1,GID_2,NAME_2,VARNAME_2,NL_NAME_2,TYPE_2,ENGTYPE_2,CC_2,HASC_2,geometry
0,0,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.1_1,Agui,,阿久比町,Machi,Town,,,"POLYGON ((136.88028 34.91983, 136.88237 34.921..."
1,1,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.2_1,Aisai,,愛西市,Shi,City,,,"POLYGON ((136.70439 35.11997, 136.69835 35.122..."
2,2,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.3_1,Anjō,,安城市,Shi,City,,,"POLYGON ((137.12468 34.99055, 137.12135 34.989..."
3,3,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.4_1,Chiryū,,知立市,Shi,City,,,"POLYGON ((137.06000 34.97676, 137.05640 34.976..."
4,4,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.5_1,Chita,,知多市,Shi,City,,JP.AI.CG,"POLYGON ((136.82556 34.94000, 136.82584 34.940..."


### Load store/site data into DataFrame

Your store or site data has to have at least a key column and columns containing latitude and longitude information. You can load it from any source (table, CSV file, etc.) to a Pandas Dataframe.

In [26]:
stores_src = spark.sql("SELECT * FROM default.jpnstoremaster_sample smpl").toPandas()
stores_src.head()

Unnamed: 0,Site ID,Latitude,Longitude
0,1,43.0698,141.2648
1,2,43.0763,141.2879
2,3,43.1355,141.3314
3,4,42.8289,141.6481
4,5,42.9001,141.5794


### Correct latitude and longitude values

Use correct_lat_lon function to clean lat / lon columns containing invalid values. Remark column in the output will contain list of remarks on errors in the data, e.g.:
- String - cannot be converted to a number
- Zeros - lat = 0, lon = 0, possibly invalid or empty data
- Duplicated - there are other records with the same latitude and longitude
- Swap lat/lon - latitude and longitude are swapped in the source
- Min/max boundaries - latitude > 90 or latitude < -90 or longitude > 180 or longitude < -180

In [28]:
stores_invalid = {
    'site_id': [1, 2, 3, 4, 5, 6, 7, 8],
    'lat': [11, '41.2N', 0, 99, 100, 80, 90, 80],
    'lon': [12, '131.3E', 0, -77, 80, 100, -181, 100],
    'desc': [
        'Correct data',
        'Strings in lat / lon',
        'Zeros (formally correct lat/lon but not for our purpose)',
        'Swapped lat and lon',
        'Swapped lat and lon, has duplicate',
        'Duplicate of previous entry',
        'Longitude exceeds max',
        'Another duplicate'
    ]
}
stores_invalid_df = pd.DataFrame(stores_invalid, columns = ['site_id', 'lat', 'lon', 'desc'] )

stores_cor_df = correct_lat_lon(stores_invalid_df)

stores_cor_df.head(len(stores_invalid_df))

Unnamed: 0,site_id,lat,lon,desc,lat_corr,lon_corr,remark
0,1,11,12,Correct data,11.0,12.0,[]
1,2,41.2N,131.3E,Strings in lat / lon,,,[String]
2,3,0,0,Zeros (formally correct lat/lon but not for ou...,,,[Zeros]
3,4,99,-77,Swapped lat and lon,-77.0,99.0,[Swap lat/lon]
4,5,100,80,"Swapped lat and lon, has duplicate",80.0,100.0,"[Swap lat/lon, Duplicated]"
5,6,80,100,Duplicate of previous entry,80.0,100.0,[Duplicated]
6,7,90,-181,Longitude exceeds max,,,[Min/max boundaries]
7,8,80,100,Another duplicate,80.0,100.0,[Duplicated]


### Do geospatial join

Call geospatial_join method to add region data to each store record.

In [30]:
stores = geospatial_join(
    shapes,
    stores_src,
    lat_col='Latitude',
    lon_col='Longitude',
    region_idx_cols=['GID_1', 'GID_2'],
    store_idx_col='Site ID'
    )

stores.head()

Unnamed: 0,Site ID,Latitude,Longitude,lattitude_corr,longitude_corr,remark,point_geometry,ID,GID_0,NAME_0,GID_1,NAME_1,NL_NAME_1,GID_2,NAME_2,VARNAME_2,NL_NAME_2,TYPE_2,ENGTYPE_2,CC_2,HASC_2,geometry
0,1899,34.9365,136.922,34.9365,136.922,,POINT (136.92200 34.93650),0,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.1_1,Agui,,阿久比町,Machi,Town,,,"POLYGON ((136.88028 34.91983, 136.88237 34.921..."
1,2225,34.9562,136.9152,34.9562,136.9152,,POINT (136.91520 34.95620),0,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.1_1,Agui,,阿久比町,Machi,Town,,,"POLYGON ((136.88028 34.91983, 136.88237 34.921..."
2,9957,34.9203,136.9186,34.9203,136.9186,,POINT (136.91860 34.92030),0,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.1_1,Agui,,阿久比町,Machi,Town,,,"POLYGON ((136.88028 34.91983, 136.88237 34.921..."
3,2921,35.1936,136.7534,35.1936,136.7534,,POINT (136.75340 35.19360),1,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.2_1,Aisai,,愛西市,Shi,City,,,"POLYGON ((136.70439 35.11997, 136.69835 35.122..."
4,5049,35.1936,136.752,35.1936,136.752,,POINT (136.75200 35.19360),1,JPN,Japan,JPN.1_1,Aichi,愛知県,JPN.1.2_1,Aisai,,愛西市,Shi,City,,,"POLYGON ((136.70439 35.11997, 136.69835 35.122..."
