In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS flooding_data")
spark.sql("USE flooding_data")

DataFrame[]

In [0]:
import requests
import json
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, MapType, ArrayType, DoubleType

root = "http://environment.data.gov.uk/flood-monitoring"

#https://environment.data.gov.uk/flood-monitoring/id/stations

#List of potential tables. You can also apply filters if you like.
flood_warning = f"{root}/id/floods" #All flood warnings and alerts:
flood_areas = f"{root}/id/floodAreas/" #floodareas
measurement_stations = f"{root}/id/stations" #All measurement stations
measurements_from_stations = f"{root}/id/stations/{id}/measures" #All measures available from a particular station
all_measures = f'{root}/id/measures' #All measures from across all the stations
all_measures_all_stations = f'{root}/data/readings' #All readings for all measures from across all the stations
all_readings_particular_measure = f'{root}/id/measures/{id}/readings' #All readings for a particular measure:
all_readings_particular_station = f'{root}/id/stations/{id}/readings' #All readings for measures from a particular station

In [0]:
#To limit the scope I will first collect the measurement stations table, and plot all the stations on the UK map. When a station is selected on the UK map I will then show what the station measures

#Created a JSON file containing measurement_stations_JSON
measurement_stations_API = requests.get(measurement_stations)
measurement_stations_API_data = measurement_stations_API.text
measurement_stations_API_JSON = json.loads(measurement_stations_API_data)
measurement_stations_JSON = measurement_stations_API_JSON['items']

In [0]:
# Define the overall schema, including the nested schema
schema = StructType([StructField("@id", StringType(), True),
                     StructField("RLOIid", StringType(), True),
                     StructField("catchmentName", StringType(), True),
                     StructField("dateOpened", StringType(), True),
                     StructField("datumOffset", StringType(), True),
                     StructField("downstageScale", StringType(), True),
                     StructField("label", StringType(), True),
                     StructField("measures", ArrayType(
                         StructType([
                            StructField("parameter", StringType(), True),
                            StructField("parameterName", StringType(), True),
                            StructField("period", StringType(), True),
                            StructField("qualifier", StringType(), True),
                            StructField("unitName", StringType(), True) 
                         ])
                     )),
                     StructField("notation", StringType(), True),
                     StructField("riverName", StringType(), True),
                     StructField("stageScale", StringType(), True),
                     StructField("stationReference", StringType(), True),
                     StructField("town", StringType(), True),
                     StructField("wiskiID", StringType(), True),
                     StructField("lat", StringType(), True),
                     StructField("long", StringType(), True),
                     StructField("easting", StringType(), True),
                     StructField("northing", StringType(), True),
                     StructField("status", StringType(), True),
                     StructField("statusReason", StringType(), True),
                     StructField("statusDate", StringType(), True),
                     StructField("type", StringType(), True)
                    ])


df = spark.createDataFrame(measurement_stations_JSON, schema)
df = df.withColumn("dateOpened",to_date(col("dateOpened"))) #convert dateOpened Schema to dates
df = df.withColumn("lat", df["lat"].cast("double")) #convert lat column to dates
df = df.withColumn("long", df["long"].cast("double")) #convert long column to dates

df = df.select("*", explode(col("measures")).alias("measures2"))
Measurement_station_df = df.select("@id","RLOIid","measures2.parameter","catchmentName","dateOpened","datumOffset","downstageScale","label","notation","riverName","stageScale","stationReference","town","wiskiID","lat","long","easting","northing","status","measures2.parameterName","measures2.period","measures2.qualifier","measures2.unitName")

Measurement_station_df.createOrReplaceTempView("Monitoring_Stations_View")
filtered_df = spark.sql("SELECT * FROM Monitoring_Stations_View WHERE LOWER(status) LIKE '%active%' AND town <> ''")
display(filtered_df)

spark.sql("DROP TABLE IF EXISTS Tbl_Monitoring_Stations")
filtered_df.write.saveAsTable("Tbl_Monitoring_Stations")

@id,RLOIid,parameter,catchmentName,dateOpened,datumOffset,downstageScale,label,notation,riverName,stageScale,stationReference,town,wiskiID,lat,long,easting,northing,status,parameterName,period,qualifier,unitName
http://environment.data.gov.uk/flood-monitoring/id/stations/1029TH,7041,level,Cotswolds,1994-01-01,,,Bourton Dickler,1029TH,River Dikler,http://environment.data.gov.uk/flood-monitoring/id/stations/1029TH/stageScale,1029TH,Little Rissington,1029TH,51.874767,-1.740083,417990,219610,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Downstream Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/1029TH,7041,level,Cotswolds,1994-01-01,,,Bourton Dickler,1029TH,River Dikler,http://environment.data.gov.uk/flood-monitoring/id/stations/1029TH/stageScale,1029TH,Little Rissington,1029TH,51.874767,-1.740083,417990,219610,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/E2043,6022,level,Welland,1992-01-01,2.0,,Surfleet Sluice,E2043,River Glen,http://environment.data.gov.uk/flood-monitoring/id/stations/E2043/stageScale,E2043,Surfleet Seas End,L31004,52.845991,-0.100848,528000,329300,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/52119,3072,level,"Parrett, Brue and West Somerset Streams",1997-01-01,10.0,,Gaw Bridge,52119,River Parrett,http://environment.data.gov.uk/flood-monitoring/id/stations/52119/stageScale,52119,Kingsbury Episcopi,520320_FW,50.976043,-2.793549,344383,119926,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/E21136,6177,level,Upper and Bedford Ouse,1996-10-01,6.3,,Hemingford,E21136,River Great Ouse,http://environment.data.gov.uk/flood-monitoring/id/stations/E21136/stageScale,E21136,Hemingford Grey,L33865,52.323618,-0.101287,529500,271200,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/2067,2032,level,Worcestershire Middle Severn,1976-04-06,,,Swindon,2067,Smestow Brook,http://environment.data.gov.uk/flood-monitoring/id/stations/2067/stageScale,2067,Swindon,2067,52.51274,-2.205945,386120,290560,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/48143,3158,level,Fal and St Austell Streams,2000-01-28,13.0,,Penryn Trelawney Park,48143,Praze Stream,http://environment.data.gov.uk/flood-monitoring/id/stations/48143/stageScale,48143,Penryn,S19726_FW,50.172659,-5.109416,178065,34920,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Downstream Stage,mASD
http://environment.data.gov.uk/flood-monitoring/id/stations/720215,5022,level,Lune and Wyre,1977-01-01,,,A6 Bridge,720215,River Brock,http://environment.data.gov.uk/flood-monitoring/id/stations/720215/stageScale,720215,Bilsborrow,720215,53.858854,-2.743276,351210,440550,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,m
http://environment.data.gov.uk/flood-monitoring/id/stations/F1906,8276,flow,Wharfe and Lower Ouse,2001-07-11,,,Netherside Hall,F1906,River Wharfe,http://environment.data.gov.uk/flood-monitoring/id/stations/F1906/stageScale,F1906,Netherside Hall,F1906,54.080705,-2.024779,398476,464978,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Flow,900,Logged,m3/s
http://environment.data.gov.uk/flood-monitoring/id/stations/F1906,8276,level,Wharfe and Lower Ouse,2001-07-11,,,Netherside Hall,F1906,River Wharfe,http://environment.data.gov.uk/flood-monitoring/id/stations/F1906/stageScale,F1906,Netherside Hall,F1906,54.080705,-2.024779,398476,464978,http://environment.data.gov.uk/flood-monitoring/def/core/statusActive,Water Level,900,Stage,m
