### **Breakdown by UK Administrative Geographies.**
- This data pipeline creates 3 GeoDatFrames from digital vector boundaries files for 3 subsets of UK Administrative Geographies: **Countries** (4 rows), **Regions** (12 rows) and **Local Authority Districts** (379 rows).
- Each GeoDataFrame was saved to Postgres as a distinct table.
- They can be then be downloaded from Postgres and reloaded into Geopandas for analysis and visualisation.
- **Downloaded data format: GeoJSON** (obtained via data api).
- **Processed data purpose:** Use as a map plotting source to produce interactive maps for various projects.
- **Data provider: The Office of National Statistics**.
- **Data pipeline result:** 
    
    3 Postgres tables: **gdf_ons_countries**, **gdf_ons_regions** and **gdf_ons_lad20**.


In [1]:
# IMPORT LIBRARIES.
import pandas as pd
import requests
import io
import zipfile
import shutil
import sqlalchemy
from sqlalchemy import create_engine
from shapely import wkt
import os
os.environ['USE_PYGEOS'] = '0'
import geopandas as gpd



In [2]:
# SET DISPLAY OPTIONS (None MEANS UNLIMITED).
# TO SET NUMBER OF ROWS DISPLAYED:
pd.options.display.max_rows=200
# TO SET NUMBER OF COLUMNS DISPLAYED:
pd.options.display.max_columns=None

## 1. DOWNLOAD AND PROCESS DIGITAL VECTOR BOUNDARIES.

### ONS UK Administrative Geographies. FORMAT: Geojson.

In [3]:
def process_geojson():
    """
    This function downloads digital vector boundaries files for UK Countries, Regions and Local Authority Districts 
    via the ONS Open Geography Portal data api.
    Each downloaded GeoJSON file is loaded into GeoPandas as a GeoDataFrame where the attributes are filtered, 
    cleaned and processed into a standardised format.
    """
    # DOWNLOAD DIGITAL VECTOR BOUNDARIES FOR UK countries, regions AND local authority districts.
    # DON'T NEED TO BYPASS 403 ERROR HERE HENCE NO USER AGENT REQUIRED.
    countries_data = requests.get("https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/Countries_December_2020_UK_BUC_2022/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson")
    regions_data = requests.get("https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/Regions_Dec_2020_EN_BUC_2022/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson")
    lad20_data = requests.get("https://services1.arcgis.com/ESMARspQHYMw9BZ9/arcgis/rest/services/Local_Authority_Districts_December_2020_UK_BUC_2022/FeatureServer/0/query?outFields=*&where=1%3D1&f=geojson")

    geo_data = [countries_data,regions_data,lad20_data]
    
    # CREATE EMPTY LIST TO STORE RESULT GEODATAFRAMES.
    gdf_list = []

    for data in geo_data:
        # LOAD GEODATA INTO GEOPANDAS.
        gdf_pre = gpd.GeoDataFrame.from_features(data.json(),crs=4326)
        # DROP COLUMNS: DROP BASED ON COLUMN POSITION, NOT NAME.
        if data == lad20_data:
            # DROP OBJECTID, WELSH NAME AND GlobalID ATTRIBUTES (NO WELSH NAME ATTRIBUTE ON THIS FILE).
            gdf_pre.drop(gdf_pre.columns[[1,4,11]], axis = 1, inplace=True)  
        else:
            # DROP OBJECTID AND GlobalID ATTRIBUTES (NO WELSH NAME ATTRIBUTE ON THESE FILES)
            gdf_pre.drop(gdf_pre.columns[[1,10]], axis = 1, inplace=True)


        # STANDARDISE ATTRIBUTE NAMES ACROSS GEODATAFRAMES, ALSO SETS ATTRIBUTE NAMES AS LOWERCASE.
        gdf_pre.columns = ["geometry","code","name","bng_e","bng_n","long","lat","shape_area","shape_length"]
    
        # LOWERCASE name ATTRIBUTE VALUES AND CLEAN UP name ATTRIBUTE VALUES FOR ADMIN GEOGRAPHIES == REGIONS.
        gdf_pre["name"] = gdf_pre["name"].str.lower()\
                                         .str.split("(england)", expand=True)[0]\
                                         .str.strip(" (")\
                                         .str.replace("east of","east")
    
        # READD name VALUE REMOVED AS A BYPRODUCT OF ABOVE CLEAN UP.
        gdf_pre.loc[gdf_pre["code"]=="E92000001","name"] = "england"
    
        # SET INDEX COLUMN.
        gdf_pre.set_index("code",inplace=True)
    
        # SET DTYPES.
        gdf_pre["name"] = gdf_pre["name"].astype("str")
        gdf_pre[["bng_e","bng_n"]] = gdf_pre[["bng_e","bng_n"]].astype("Int32")
     
        # RESET COLUMN ORDER.
        gdf = gdf_pre[["name","bng_e","bng_n","long","lat","shape_area","shape_length","geometry"]].copy()

        # ADD GEODATAFRAME TO LIST TO STORE RESULT.
        gdf_list.append(gdf)
    
    # RETURN COMPLETE LIST OF GEODATAFRAMES.
    return gdf_list

In [4]:
# RUN THE FUNCTION AND INDEX THE RETURNED LIST TO GENERATE APPROPRIATE GEODATAFRAME.
countries_gdf = process_geojson()[0]
regions_gdf = process_geojson()[1]
lad20_gdf = process_geojson()[2]

---
---

## 2. CONNECT TO POSTGRESQL.

In [5]:
def connect_to_postgres():
    """
    Connect to Postgres database 'github_projects' as user 'postgres'.
    """
    conn_params_dict = {"user":"postgres",
                        "password":"password",
                        # FOR host, USE THE POSTGRES INSTANCE CONTAINER NAME, AS THE CONTAINER IP CAN CHANGE.
                        "host":"postgres",
                        "database":"github_projects"}

    connect_alchemy = "postgresql+psycopg2://%s:%s@%s/%s" % (
        conn_params_dict['user'],
        conn_params_dict['password'],
        conn_params_dict['host'],
        conn_params_dict['database']
    )

    # CREATE POSTGRES ENGINE (CONNECTION POOL).
    engine = create_engine(connect_alchemy)
    print("Connection to Postgres successful.")
    return engine

In [6]:
# EXECUTE FUNCTION TO CONNECT TO POSTGRES.
engine = connect_to_postgres()

Connection to Postgres successful.


---
---

## 3. WRITE GEODATAFRAMES TO POSTGRES.

In [7]:
def ul_gdf_to_pg(gdfs,names,pkey,dtype,index_bool):
    """
    Write 1 or more GeoDataFrames to Postgres for storage. 
    For each written GeoDataFrame, the contents of the geometry column is serialized and then saved as a string 
    in Postgres.
    
    The following arguments are required:
    'gdfs': A list of 1 or more GeoDataFrames to write to Postgres.
    'names': A list of equal length to 'gdfs' with the corresponding names of each Postgres table to be created.
    'pkey': Which attribute to set as the new table(s) primary key. String.
    'dtype': A dictionary of "attribute names:SQL Alchemy data types" for the new table(s). N.B. All tables being 
    written must share the same attribute names/dtypes.
    'index_bool': Whether or not to write the GeoPandas index to Postgres as a column. Possible values True/False.
    """  
    for gdf,table_name in zip(gdfs,names):  
        # SERIALISE THE CONTENTS OF THE geometry COLUMN INTO WKT (well-known text) STRINGS SO THAT IT ...
        # CAN BE REPRESENTED BY DTYPE object AND THE GEODATAFRAME CAN BE SAVED TO POSTGRES.
        gdf['geometry'] = gdf['geometry'].apply(lambda x: wkt.dumps(x))
        
        # CONVERT GEODATAFRAME TO DATAFRAME AND WRITE TO POSTGRES.
        pd.DataFrame(gdf).to_sql(table_name, con = engine, if_exists='replace', index=index_bool,
                                 # SET POSTGRES DTYPES.
                                 dtype=dtype)
    
        # ADD PRIMARY KEY TO CREATED TABLE. 
        set_primary_key = engine.execute(f"""
                                         ALTER TABLE {table_name} ADD PRIMARY KEY ({pkey})
                                         """)
        set_primary_key.close()
    
    if len(names)==1:
        print(f"The \033[1m{table_name}\033[0m Postgres table has been successfully created.\n")
    else:
        print(f"The \033[1m{', '.join(names[:-1])}\033[0m and \033[1m{(names)[-1]}\033[0m Postgres tables have been successfully created.\n")

### 3.1. WRITE DATA SOURCE 1 TO POSTGRES:

In [8]:
# WRITE THE ONS GEOJSON BASED GEODATAFRAMES TO POSTGRES:
ul_gdf_to_pg(gdfs=[countries_gdf,regions_gdf,lad20_gdf],
             names=["gdf_ons_countries","gdf_ons_regions","gdf_ons_lad20"],
             pkey="code",
             dtype={
                    "code":sqlalchemy.types.Text,
                    "name":sqlalchemy.types.Text,
                    "bng_e":sqlalchemy.types.Integer,
                    "bng_n":sqlalchemy.types.Integer,
                    "long":sqlalchemy.types.Float,
                    "lat":sqlalchemy.types.Float,
                    "shape_area":sqlalchemy.types.Float,
                    "shape_length":sqlalchemy.types.Float,
                    "geometry":sqlalchemy.types.Text
             },
             index_bool=True
            )



The [1mgdf_ons_countries, gdf_ons_regions[0m and [1mgdf_ons_lad20[0m Postgres tables have been successfully created.



---
---

## 4. DOWNLOAD GEODATAFRAME FROM POSTGRES.

In [9]:
def dl_gdf_from_pg(table_name,dtype,index_col=None):
    """
    Download a single Postgres table and automatically process it into a GeoPandas GeoDataFrame. 
    For each downloaded GeoDataFrame, the contents of the geometry column is deserialized and then saved as a 
    dtype="geometry" in GeoPandas.
    
    The following arguments are required:
    'table_name': A string with the name of the GeoDataFrame as held on Postgres as a table.
    'dtype': A dictionary of "attribute names:GeoPandas data types" for the downloaded GeoDataFrame. If an 
    attribute is not present in this dictionary then a default dtype is set by GeoPandas automatically for 
    the attribute.
    'index_col': Which attribute to set as the index of the GeoDataFrame. Optional string, defaults to None.
    """ 
    # DOWNLOAD DATAFRAME VERSION OF DIGITAL VECTOR BOUNDARIES FROM POSTGRES.
    df_from_pg = pd.read_sql_table(table_name, con=engine)
    
    # DESERIALIZE THE WKT STRINGS REPRESENTATION OF THE GEOMETRY COLUMN.
    df_from_pg['geometry'] = df_from_pg["geometry"].apply(lambda x: wkt.loads(x))

    # CONVERT DATAFRAME INTO GEODATAFRAME AND SET GEOMETRY COLUMN.
    gdf_from_pg = gpd.GeoDataFrame(df_from_pg,geometry=df_from_pg["geometry"],crs=4326)
    
    # SET DTYPES.
    gdf_from_pg = gdf_from_pg.astype(dtype)
    
    print(f"The \033[1m{table_name}\033[0m table was successfully downloaded from Postgres and loaded into GeoPandas.\n") 
    
    # SET INDEX COLUMN (OPTIONAL).
    if index_col==None:
        print(f"No index attribute was set on the GeoDataFrame so the default numeric index has been used.")
    else:
        gdf_from_pg.set_index(index_col,inplace=True)
        print(f"The \033[1m{index_col}\033[0m attribute was set as the GeoDataFrame index.")
   
    
    return gdf_from_pg

### 4.1. DOWNLOAD DATA SOURCE 1 FROM POSTGRES AND LOAD INTO GEOPANDAS:

In [10]:
# ONS BASED GEODATAFRAME DOWNLOAD FROM POSTGRES.
dl_gdf_from_pg(table_name="gdf_ons_countries",

               dtype={"name":"string",
                      "bng_e":"Int32",
                      "bng_n":"Int32",
                      "long":"float32",
                      "lat":"float32",
                      "shape_area":"float32",# MIGHT CHANGE BACK TO float64.
                      "shape_length":"float32"
                      },
               index_col="code"
              )

The <gdf_ons_countries> table was successfully downloaded from Postgres and loaded into GeoPandas.

The [1mcode[0m attribute was set as the GeoDataFrame index.


Unnamed: 0_level_0,name,bng_e,bng_n,long,lat,shape_area,shape_length,geometry
code,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
E92000001,england,394883,370883,-2.07811,53.23497,130681600000.0,4616816.0,"MULTIPOLYGON (((-1.77887 55.66773, -1.80244 55..."
N92000002,northern ireland,86544,535337,-6.8557,54.615009,14331860000.0,829125.2,"MULTIPOLYGON (((-6.18992 55.25846, -6.20082 55..."
S92000003,scotland,277744,700060,-3.97094,56.177429,78656630000.0,9812084.0,"MULTIPOLYGON (((-0.79514 60.74093, -0.79106 60..."
W92000004,wales,263405,242881,-3.99417,52.06741,20818920000.0,1556249.0,"MULTIPOLYGON (((-3.08993 53.25953, -3.09314 53..."


---
---
## 5. CLOSE ALL CONNECTIONS TO POSTGRES DATABASE.

In [11]:
def disconnect_from_postgres():
    """
    Completely disconnect from Postgres.
    """
    engine.dispose() 
    print("All connections to Postgres have been terminated.")

In [12]:
disconnect_from_postgres()

All connections to Postgres have been terminated.
