###**Objective**

When a sewage treatment works (STW) experiences a larger inflow of effluent than it is normally able to handle, the extra waste is routed to a storm water tank. Occasionally, such a large influx of effluent is experienced that the storm water tank level is reached and excess waste is routed to the environment (typically rivers) through a consented overflow.

A STW is only allowed to discharge to the environment if it is treating the maximum amount of effluent that it was designed to treat otherwise a discharge to the environment is classified as a pollution event and the water company responsible can have fines levied upon them. 

To solve the problem, we use make of the latest technology accessible to us: the digital twin. The waste water digital twin is a digital representation of the waste water network that links the geospatial data and other properties of physical assets with the telemetry data generated by those assets.


- We are try to assess data quality and find the site(s) at which pollution events occurred. 

- Connect these site(s) to the twin to find the nearby SLMs and SPSs to see the interconnectedness between these assets.
- Ultimately, we want to see if we can use the SLMs and SPSs as early indicators of pollution events.

###**Import Libraries**

In [0]:
import numpy as np
import pickle
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
from pyspark.sql import Window
from datetime import datetime, timedelta
import pyspark.sql.functions as F
from pyspark.sql.types import *
from data_factory.databricks.Metastore import Metastore
metastore = Metastore()

###**Loading Data**

- **HSI Asset Hierarchy**: Contains metadata for all assets in the DAM tool. Includes consented overflow levels, sewer levels, storm tank levels, relevant flow monitors (e.g. flow to treatment, final effluent).

- **HSI data** : Contains data from the High Speed Interface from April 2021 for all assets in the above **HSI Asset Hierarchy table**.
- **Alert Configuration**: Contains the latest alert configurations for the assets in the **HSI data**. Includes flow permit, flow setpoint, or threshold at which an alert will be triggered, Condition for alert to be triggered relative to the alert level, Monitor name that corresponds to the last part of the SCADA fullname (if from HSI).

In [0]:
### Read hsi_asset hierarchy table
hsi_asset_hier = spark.table('dpslm_default.hsi_asset_hier')
hsi_asset_hier.orderBy('site_level','asset').display()

### Read hsi data table
hsi_data = spark.table('dpslm_default.hsi_data_history_shared')
hsi_data.orderBy('site_level','asset','time').display()

### Read alert_configuration table
config = spark.table('dpslm_default.alert_configuration')
config.orderBy('id')

id,hsi_id,asset,site_level,value,units,time,quality
124633075,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,612.5,mm,2021-04-06T05:00:00.000+0000,Good
127947993,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,601.25,mm,2021-04-06T06:00:00.000+0000,Good
138984730,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,602.0,mm,2021-04-06T07:00:00.000+0000,Good
73583473,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,617.5,mm,2021-04-06T08:00:00.000+0000,Good
84230427,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,610.0,mm,2021-04-06T09:00:00.000+0000,Good
80779555,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,599.5,mm,2021-04-06T10:00:00.000+0000,Good
91492127,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,635.0,mm,2021-04-06T11:00:00.000+0000,Good
121300168,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,598.0,mm,2021-04-06T12:00:00.000+0000,Good
86538071,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,607.25,mm,2021-04-06T13:00:00.000+0000,Good
78331751,1244740,CONSENTED OVERFLOW LEVEL,ABBESS RODING STW,614.5,mm,2021-04-06T14:00:00.000+0000,Good


####Joining the hsi_data and alert_configuration data

We join on the 'right' because we want all the data from **alert configuration** that is in **hsi data**. The alert level are important to have because they reveal when asset values exceeded the thresholds.

In [0]:
# combining hsi_data and alert configuration data
composite_key = ['hsi_id', 'site_level', 'asset']

# merge data
df_hsi_data = hsi_data.join(config, on=composite_key, how='right')

#df_hsi_data.display
df_hsi_data.display()

hsi_id,site_level,asset,id,value,units,time,quality,id.1,alert_id,alert_priority,date_time_added,alert_level,units.1,conditional_operator,logical_operator,site_level_user,alert_type,flow_type,comments
0,ALDERMASTON STW,UMON3 ALDES1ZZ,228891971,171.43333333333334,mm,2021-04-01T11:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,239055132,205.2,mm,2021-04-15T20:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,240396115,162.23333333333332,mm,2021-04-19T15:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,239304250,101.7,mm,2021-04-24T05:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,227680190,146.63333333333333,mm,2021-04-24T15:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,235113446,174.36666666666667,mm,2021-04-24T20:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,235879359,178.93103448275863,mm,2021-04-29T06:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,240075420,70.06666666666666,mm,2021-05-01T02:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,218145901,125.3103448275862,mm,2021-05-02T06:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,
0,ALDERMASTON STW,UMON3 ALDES1ZZ,238893484,182.53333333333333,mm,2021-05-04T14:00:00.000+0000,Unknown,65,3,,2022-03-03T06:29:49.273+0000,282.0,mm,>,,Lizette Loubser,UMON3,Setpoint,


####Filtering site_levels

It is hypothesised that if the pollution event does occur, the **CONSENTED OVERFLOW LEVEL** would be above the threshold, even though this may need to confirmed by other assets' EDM. We want the sites that have **CONSENTED OVERFLOW** as one of the assets.

In [0]:
# data frame with different assets for each site_level
config_assets = (config.filter(F.col('alert_type').isin(['STK - Flow Below Setpoint','SDE - Flow Below Setpoint','STK - Flow Below Permit','SDE - Flow Below Permit']))
       .groupBy('site_level', 'alert_id')
       .agg(F.collect_set('asset').alias('assets_list_by_alert_id_config')))

# from spark dataframe to pandas dataframe
config_assets_pandas = config_assets.toPandas()

# instantiate sites list
sites = []

# loop through config_assets_df
for i in range(0,len(config_assets_pandas)):
  
  # asset
  asset_list = config_assets_pandas['assets_list_by_alert_id_config'][i]
  
  # site_level
  site_level_name = config_assets_pandas['site_level'][i]
  
  # check if assets are above 2 and have consented overflow
  if len(asset_list) >= 2 and ('CONSENTED OVERFLOW' in asset_list or 'CONSENTED OVERFLOW LEVEL' in asset_list):
    
    #append the site name
    sites.append(site_level_name)

# number of sites with CONSONTED OVERFLOW
len(list(set(sites)))

There are 180 STW with consented overflow. We then want to make sure that we focus on sites whose data is already loaded in the waste digital twin. We want to focus on sites that are loaded in the digital twin. The pickle file of the list of sites is retrieved below.

In [0]:
twin_sites = sorted(['HOOK NORTON',
 'BROAD HINTON',
 'KINGS SUTTON',
 'UTTLESFORD ',
 'GRENDON UNDERWOOD',
 'FLEET',
 'CHOBHAM',
 'WHITE RODING',
 'CLAVERING',
 'HADDENHAM',
 'SULHAMPSTEAD',
 'WARMINGTON',
 'WISLEY',
 'KINGSTON BAGPUIZE',
 'RIPLEY',
 'BILLERICAY (AW)',
 'GORING',
 'CASTLE EATON',
 'UPPER WINCHENDON',
 'MAPLE LODGE',
 'SEVENHAMPTON',
 'LECKHAMPSTEAD',
 'ELSFIELD',
 'EAST ILSLEY',
 'HOCKFORD (PIRBRIGHT)',
 'CLANFIELD',
 'MAIDENHEAD',
 'COTTERED',
 'BAYDON',
 'KINTBURY',
 'NEWBURY ',
 'MATTINGLEY (HOUND GREEN)',
 'GREAT ROLLRIGHT',
 'STANFORD RIVERS',
 'WINGRAVE',
 'SANDFORD ST MARTIN',
 'STRATFIELD MORTIMER',
 'SHRIVENHAM',
 'BLETCHINGDON',
 'CHAPMORE END',
 'FROXFIELD',
 'LONGBOROUGH',
 'BROUGHTON',
 'NORTHLEACH',
 'BURSTOW',
 'LITTLE MARLOW',
 'LIGHTWATER',
 'BECKTON',
 'DORTON',
 'BASINGSTOKE',
 'BEENHAM',
 'DORCHESTER',
 'BENSON',
 'NEWBURY',
 'FEWCOT (AW)',
 'GREAT MILTON',
 'WESTCOTT (PRIVATE)',
 'RYE MEADS',
 'HAMPSTEAD NORREYS',
 'BLUNSDON',
 'AYLESBURY',
 'ST STEPHENS CLOSE ',
 'RIVERSIDE',
 'MILTON-UNDER-WYCHWOOD',
 'SHELLINGFORD',
 'FAIRFORD',
 'ENSTONE',
 'HAM HILL (SW)',
 'ABBESS RODING',
 'CHIPPING NORTON',
 'LOXWOOD (SW)',
 'DIDCOT',
 'ASCOT',
 'SHOTTESWELL',
 'UPPER SUNDON',
 'ALDERSHOT',
 'LITTLE MILTON',
 'MIDGHAM',
 'FARNBOROUGH',
 'CUDDINGTON',
 'CRONDALL',
 'ASH VALE',
 'WHITCHURCH',
 'WOOLHAMPTON',
 'CASSINGTON',
 'BLACKBIRDS',
 'LEWKNOR',
 'FARINGDON',
 'COMPTON',
 'BUCKLEBURY',
 'ASHLEY GREEN',
 'CRANLEIGH',
 'HOGSMILL',
 'ANDOVERSFORD',
 'WINDSOR',
 'BEDDINGTON',
 'HASLEMERE',
 'WASHWATER',
 'BURGHFIELD',
 'WITHINGTON',
 'HOLMWOOD',
 'RYE COMMON (DOGMERSFIELD)',
 'MOLLINGTON',
 'STONE',
 'NUNEHAM COURTENAY',
 'HEYFORD (UPPER HEYFORD)',
 'HITCHIN (AW)',
 'TYLERS LANE',
 'APPLETON',
 'BARKWAY',
 'MARSH GIBBON',
 'WHITE WALTHAM',
 'LECHLADE',
 'GODALMING',
 'SANDON (AW)',
 'CHIPPING WARDEN',
 'SONNING COMMON',
 'Highfields (Frampton Mansell)',
 'GUILDFORD',
 'CHATTER ALLEY',
 'BREACHWOOD GREEN',
 'HORLEY',
 'MOGDEN',
 'HENLEY',
 'AVON DASSETT',
 'CHARLBURY',
 'OVING (AW)',
 'CHARNEY BASSETT',
 'EPPING',
 'CRAWLEY',
 'MORETON',
 'ASTON ABBOTTS (AW)',
 'CHERTSEY',
 'WADDESDON',
 'CHESHAM',
 'LITTLE COMPTON',
 'CHARLTON-ON-OTMOOR',
 'BURFORD',
 'WILTON',
 'LONG WITTENHAM',
 'STEWKLEY',
 'DANE END',
 'HAMBLEDEN',
 'COBERLEY',
 'LONG CRENDON',
 'WESTON',
 'ASHFORD HILL',
 'CRICKLADE',
 'STRATFIELD SAYE',
 'WANBOROUGH',
 'STADHAMPTON',
 'BRACKNELL',
 'WOOTTON BASSETT (WW)',
 'LOWER BASILDON',
 'MIDDLETON CHENEY',
 'CHACOMBE',
 'THE GABLES (ADBURY HOLT)',
 'BOURTON ON THE WATER',
 'CULWORTH',
 'THAME',
 'RAMSBURY',
 'FAWLEY',
 'BORDON',
 'HORNTON',
 'CHURCH HANBOROUGH',
 'BYFIELD',
 'CLAYDON',
 'BIBURY',
 'BRICKENDON',
 'TETSWORTH',
 'ARBORFIELD',
 'FINSTOCK',
 'LITTLE BERKHAMSTEAD',
 'LUTON (EAST HYDE)',
 'SOUTH LEIGH',
 'THERFIELD',
 'UFFINGTON',
 'ISLIP',
 'BENTLEY',
 'IRONSBOTTOM',
 'CLIFTON',
 'QUENDON (AW)',
 'THORPE MANDEVILLE',
 'SOUTH MORETON',
 'COMBE',
 'GUITING POWER',
 'HIGHWORTH',
 'SWINDON',
 'ELSTEAD',
 'EVERSLEY CROSS (LONG WATER)',
 'STANTON ST JOHN',
 'RUDGEWICK (SW)',
 'HARPENDEN',
 'EAST SHEFFORD',
 'WHEATLEY',
 'HUNGERFORD',
 'WATLINGTON',
 'CHIEVELEY',
 'HALTON (PRIVATE)',
 'FOSCOT',
 'LEADEN RODING',
 'ALTON',
 'DRAYTON',
 'CHILTON FOLIAT',
 'BASILDON PARK',
 'SHERFIELD-ON-LODDON',
 'HANNINGTON (WILTS)',
 'FOREST HILL',
 'PIKE HILL RISE',
 'FRIETH',
 'BUSCOT',
 'CULHAM',
 'RATLEY',
 'NAUNTON',
 'MIDDLETON STONEY',
 'SYREFORD',
 'BROADWELL',
 'DAGNALL',
 'WESTON ON THE GREEN',
 'UPMINSTER (AW)',
 'GERRARDS CROSS',
 'FURNEUX PELHAM',
 'NETTLEBED',
 'LONG SUTTON',
 'YATTENDON',
 'CROSSNESS',
 'NORTH WEALD',
 'LEATHERHEAD',
 'WILLINGALE',
 'HEADLEY',
 'TWYFORD (AW)',
 'CHALTON (AW)',
 'THEYDON BOIS',
 'STANTON HARCOURT',
 'TOWERSEY',
 'WOKING',
 'REIGATE (EARLSWOOD)',
 'MORETON PINKNEY',
 'WOKINGHAM (ASH RIDGE)',
 'ABINGDON',
 'LITTLE HALLINGBURY',
 'BUNTINGFORD',
 'LITTLEWORTH',
 'ROWSHAM',
 'CHADLINGTON',
 'BUCKLAND',
 'WOODEATON',
 'AMPNEY ST PETER',
 'GREATWORTH',
 'WELFORD',
 'SHUTFORD',
 'PURTON',
 'SELBORNE',
 'BODDINGTON',
 'KIMPTON',
 'GREENHAM COMMON',
 'STREATLEY',
 'WIDFORD AND WARESIDE',
 'COLESHILL',
 'REMENHAM HILL',
 'HANWELL',
 'LUDGERSHALL',
 'CHILTON',
 'CIRENCESTER',
 'ASTON LE WALLS',
 'EAST GRAFTON',
 'SHIRBURN',
 'OXTED & LIMPSFIELD (SW)',
 'HURLEY',
 'TACKLEY',
 'NEW MILL EVERSLEY',
 'BRAMFIELD',
 'MERSTHAM',
 'SANDHURST',
 'EYDON',
 'HORLEY (OXON)',
 'WANTAGE',
 'WICKHAM',
 'MIDDLE BARTON',
 'WOLVERTON TOWNSEND',
 'CAMBERLEY',
 'ALDERMASTON',
 'CHARWELTON',
 'BILLINGBEAR',
 'SHERBOURNE ST JOHN',
 'ASHENDON',
 'STANDON',
 'BRENTWOOD',
 'CUDDESDON',
 'CHAPEL ROW',
 'BRAUGHING',
 'ASHTON KEYNES',
 'MARKYATE',
 'WHITWELL',
 'WOODSTOCK',
 'ALDERSHOT (MOD)',
 'BOXFORD',
 'WITNEY',
 'WHITTINGTON',
 'IVER NORTH',
 'DORKING',
 'MARLBOROUGH',
 'SILCHESTER',
 'WARWICK WOLD',
 'EASTHAMPSTEAD PARK',
 'BISHOPS GREEN',
 'WOLVERTON COMMON',
 'OXFORD',
 'HORTON CUM STUDLEY',
 'HARTLEY WINTNEY',
 'BECKLEY',
 'FARNHAM',
 'WARGRAVE',
 'MORETON-IN-MARSH',
 'SHALBOURNE',
 'HAMPSTEAD MARSHALL',
 'HANNINGTON',
 'PRINCES RISBOROUGH',
 'CROPREDY',
 'HATFIELD HEATH',
 'HATFIELD (MILL GREEN)',
 'READING',
 'STEEPLE CLAYDON (AW)',
 'SOUTHROP',
 'COLGATE',
 'BANBURY',
 'BOURTON',
 'Farnham',
 'PANGBOURNE',
 'DODDINGHURST (AW)',
 'GREAT BEDWYN',
 'BAMPTON',
 'LONGREACH',
 'TRING',
 'CHOLSEY',
 'BLOXHAM',
 'BLEDINGTON',
 'SHABBINGTON',
 'TARLTON STW',
 'SHAMLEY GREEN',
 'HANDCROSS (SW)',
 'THORNWOOD',
 'BERKHAMPSTEAD',
 'BICESTER',
 'HUNTERCOMBE',
 'KEMPSFORD',
 'GREAT GADDESDEN',
 'BARFORD ST MICHAEL',
 'DEEPHAMS',
 'BISHOPS STORTFORD',
 'KINGSCLERE',
 'CHALGROVE',
 'CHENIES',
 'ASHAMPSTEAD (THE STUBBLES)',
 'PRIORS MARSTON',
 'TAKELEY',
 'MANUDEN',
 'STANDLAKE',
 'FYFIELD (CLATFIELD)',
 'ESHER',
 'STANFORD IN THE VALE',
 'SLOUGH',
 'WORMINGHALL',
 'TIDDINGTON',
 'STANSTED MOUNTFITCHET',
 'HAMPDEN ROW',
 'STUDHAM',
 'CROUGHTON',
 'CARTERTON',
 'DUNSTABLE (AW)',
 'CHINNOR',
 'TEMPLE GUITING',
 'HIGH RODING (AW)',
 'WINTERBOURNE',
 'COATES',
 'SPELSBURY',
 'CADDINGTON',
 'WEYBRIDGE',
 'GODSTONE (SW)',
 'WING (AW)'])



In [0]:
#modifying site names
twin_sites = [i+' '+'STW' for i in twin_sites]

# list of unique sites in the waste twin and HSI data
sites = list(set([i for i in sites if i in twin_sites]))

#see length of sites
print(len(my_sites))

There are 157 sites that have **CONSENTED OVERFLOW LEVEL** data and are also in the waste twin. We look to assess the nature of discharge (whether it's consented or unconsented) and also the data quality.

We prioritise sites that have a high average **FINAL EFFLUENT FLOW MEAN**, these are STWs that process a lot of EFFLUENT. We do this because if the spill were to happen, a lot of effluent would flow to the environment. Hence more fines if the flow is unconsented, and also the costs of cleaning the spilled effluent.

In [0]:
# finding the number of events in each site_level
list_asset = ['FINAL EFFLUENT FLOW MEAN','FLOW TO TREATMENT MEAN_CALCULATED'] 

df_hsi_data.filter((F.col('asset').isin(list_asset)) & (F.col('value') > F.col('alert_level')) & F.col('site_level').isin(sites)).select(['site_level','asset','value']).groupBy('site_level') \
.agg({'value' : 'avg'}).orderBy("avg(value)", ascending = False) \
.display()

site_level,avg(value)
MAIDENHEAD STW,752184.173574923
CROSSNESS STW,13515.136723623067
WADDESDON STW,5878.173673356796
DEEPHAMS STW,4860.849463898875
MAPLE LODGE STW,2751.4837294403023
RYE MEADS STW,2381.8185885541448
BEDDINGTON STW,2061.265652063147
HOGSMILL STW,1908.3062866210948
READING STW,1381.3586821211147
SWINDON STW,1324.3720135485232


## STW(s) Analysis

There are cases where there are sites with incomplete data. We do our analysis on sites that:

- Have consented overflow level as one of the assets.
- 3 or more assets to corroborate one another.

- The **get_asset_name_count** function - given the site_name: the function returns the assets and the number of values.

In [0]:
def get_asset_name_count(site_name):
  
  
  """
  Function that returns a list of assets with the count of available values: for the specified site
  Params:
  site_name : site_level e.g OXFORD STW
  
  Outputs: asset with corresponding count of values in each asset
  
  """

  # site_level dataframe
  site_stw_df = df_hsi_data.filter((df_hsi_data['site_level'] == site_name)) 
  
  # grouped by asset count
  asset_count = site_stw_df.groupBy('asset').count()
  
  return asset_count

- The **data_collector** function - given the site_name: the function returns the dataframe(pivot table) of assets and their alert level.

In [0]:
def data_collector(site_name):
  
  """
  Function takes in site_level and bring together assets and 
  their maximum values for alert_level and 
  returns a Pandas dataframe
  
  Params:
  site_name : site_name, e.g ABBESS RODING STW

  Outputs: dataframe of assets and alert_level in a site_name
  """
  
  # for efficient conversion : pyspark to pandas dataframe
  spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  
  # site_level dataframe
  site_stw_df = df_hsi_data.filter((df_hsi_data['site_level'] == site_name))

  # pivot table with maximum values for alert_level for each asset
  pivot_DF_alerts = site_stw_df.groupBy("time").pivot("alert_level") \
              .max("alert_level").orderBy('time')
  
  # data frame of assets in site_name
  pivot_DF_asset = site_stw_df.groupBy("time").pivot("asset") \
              .max("value").orderBy('time')

  
  # new asset dataframe with maximum asset alert_level
  pivot_DF_full = pivot_DF_asset.join(pivot_DF_alerts, on = 'time', how = 'right') \
                  .dropDuplicates() \
                  .orderBy('time')
    
  return pivot_DF_full

- The **get_alerts** function - given the site_name: the function returns finds the maximum alert_level for each asset
  and returns a list of lists of asset and alert_level.

In [0]:
def get_alerts(site_name):
  
  """
  Function takes in site_name, finds the maximum alert_level for each asset
  and returns a list of lists of asset and alert_level e.g [['asset','alert_level']]
  Params:
  site_name : site_level, e.g ABBESS RODING STW

  Outputs: list of lists
  """
  
  # for efficient conversion : pyspark to pandas dataframe
  spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  
  # dataframe with site_name assets and alert_level
  site_stw_df = df_hsi_data.filter((df_hsi_data['site_level'] == site_name)).select(['asset','alert_level'])
  
  # dataframe with asset and maximum alert_level
  alerts = site_stw_df.groupBy('asset') \
            .agg({'alert_level' : 'max'}) \
            .toPandas()
  
  # number of counts in asserts
  site_counts = get_asset_name_count(site_name).toPandas() 
  
  # instantiate asset_alerts list
  asset_alerts = []
  
  for i in range(0,len(alerts)):
    
    # maximum alert level for asset_name
    value = alerts['max(alert_level)'][i]
    
    # asset for asset_name
    asset = alerts['asset'][i]
    
    # taking asset with more than 1 records
    if site_counts['count'][i] > 1:
    
      #name = [asset_name, str(Value)]
      asset_alerts.append([asset, str(value)])
  
  return asset_alerts


- The **plot_graphs** function - given the site_name: the function returns subplots of assets in site_name.

In [0]:
def plot_graphs(site_name):
  
  # for efficient conversion : pyspark to pandas dataframe
  spark.conf.set("spark.sql.execution.arrow.enabled", "true")
  
  # pandas dataframe with assets and alert level 
  df = data_collector(site_name).toPandas()
  
  # list of lists of asset and alert_level
  asset_alerts = get_alerts(site_name)   

  # create a subplots based  on the number of columns available in the records
  fig = make_subplots(rows = len(asset_alerts), cols = 1,
                     shared_xaxes=True,
                      vertical_spacing = 0.02)
  
  # looping through asset_alerts
  for i in range(0,len(asset_alerts)):
    
    # taking asset or alert_level 
    for asset_alert in asset_alerts[i]:
      
      # adding the graph for asset or alert_level 
      fig.append_trace(go.Scatter(
        x = df['time'],
        y = df[asset_alert],
        mode = "lines", 
        name = asset_alert,
        ), row = i+1, col = 1)

  # updating the final graph layout
  fig.update_layout(
      autosize = False, width = 1500,
      height = 1000, title_text = "{} assets vs time".format(site_name),
      legend_title = "variable")

  return fig.show()

In [0]:
plot_graphs('WITNEY STW')