Get hourly readings for past 3 hours

In [1]:
select (regexp_matches(id, '^(.*)_\d{10}$'))[1] as station,
        hour,
        avg_temperature,
        avg_humidity,
        wind_speed_avg_mph,
        wind_speed_direction,
        rainfall_hour
from public.wm_hourly_readings
where hour > ((CURRENT_TIMESTAMP at time zone 'cdt') - interval '3 hours')
order by hour desc;

station,hour,avg_temperature,avg_humidity,wind_speed_avg_mph,wind_speed_direction,rainfall_hour
pico,2023-07-24T08:00:00,72.5779464285713,82.3159226190476,0.746324795525931,SW,0.0
pico,2023-07-24T07:00:00,67.4220183486239,88.9253822629969,0.746324795525931,SW,0.0


Get latest set of readings in JSON format

In [2]:
select json_agg(wm_latest_readings)->0 from wm_latest_readings where station='pico' limit 1;

Column1
"{""station"": ""pico"", ""avg_temperature"": 78.8, ""avg_humidity"": 64, ""wind_speed_gust_mph"": 1.49264959105186, ""wind_speed_avg_mph"": 1.49264959105186, ""wind_speed_direction"": ""W"", ""rainfall_hour"": 0, ""rainfall_day"": 0, ""updated_at"": ""2023-07-24T09:50:01.762378""}"


formatted output from query above:  
```
{
  "station": "pico",
  "avg_temperature": 78.8,
  "avg_humidity": 64,
  "wind_speed_gust_mph": 1.49264959105186,
  "wind_speed_avg_mph": 1.49264959105186,
  "wind_speed_direction": "W",
  "rainfall_hour": 0,
  "rainfall_day": 0,
  "updated_at": "2023-07-24T09:50:01.762378"
}
```

Function to get dewpoint (inputs: temperature, humidity)

In [None]:
CREATE OR REPLACE FUNCTION dewpoint(temperature float, humidity float) RETURNS float AS $$
        BEGIN
                RETURN 243.04*(ln(humidity/100)+((17.625*temperature)/(243.04+temperature)))/(17.625-ln(humidity/100)-((17.625*temperature)/(243.04+temperature)));
        END;
$$ LANGUAGE plpgsql;

Daily Summaries Table

wm\_daily\_summaries DDL

In [1]:
SELECT
date, avg_temperature, avg_humidity
 FROM wm_daily_summaries ORDER BY date ASC LIMIT 7;

date,avg_temperature,avg_humidity
2023-07-12T00:00:00,70.0278124999999,
2023-07-12T00:00:00,72.2516501623377,53.090604752886
2023-07-13T00:00:00,77.1840554402887,61.5611636347221
2023-07-14T00:00:00,76.1019838518164,60.6416355288254
2023-07-15T00:00:00,72.5952636044175,55.2594958815145
2023-07-16T00:00:00,71.4610505153607,50.199375638587
2023-07-17T00:00:00,67.9352073634251,57.7044724996956


In [None]:
-- DROP TABLE wm_daily_summaries;
CREATE TABLE wm_daily_summaries (
        id varchar(100) primary key not null unique,
        date timestamp,
        station varchar(50),
        avg_temperature float,
        max_temperature float,
        min_temperature float,
        avg_humidity float,
        max_humidity float,
        min_humidity float,
        wind_speed_avg_mph float,
        max_wind_speed_gust_mph float,
        wind_speed_direction char(5),
        num_records int
);
create unique index idx_wm_daily_summaries_id on wm_daily_summaries (id);


/* get yesterdays daily summaries */


CREATE OR REPLACE FUNCTION public.wm_update_yesterday_daily_summaries(
	)
    RETURNS void
    LANGUAGE 'sql'
    COST 100
    VOLATILE STRICT PARALLEL UNSAFE
AS $BODY$
INSERT INTO wm_daily_summaries
SELECT
station||'_'||to_char(date_trunc('day', hour::timestamptz AT TIME ZONE 'cdt'), 'YYYYMMDDHH24') as id,
date_trunc('day', hour::timestamptz AT TIME ZONE 'cdt') as day,
station,
avg(avg_temperature) as avg_temperature,
max(max_temperature) as max_temperature,
min(min_temperature) as min_temperature,
avg(avg_humidity) as avg_humidity,
max(max_humidity) as max_humidity,
min(min_humidity) as min_humidity,
avg(wind_speed_avg_mph) as wind_speed_avg_mph,
max(wind_speed_gust_mph) as max_wind_speed_gust_mph,
mode() WITHIN GROUP (ORDER BY wind_speed_direction) AS wind_speed_direction,
count(*) as num_records
FROM wm_hourly_readings
WHERE  date_trunc('day', hour::timestamptz AT TIME ZONE 'cdt') = date_trunc('day', current_timestamp AT TIME ZONE 'cdt') - INTERVAL '1 day' -- yesterday's summaries
-- WHERE  date_trunc('day', hour::timestamptz AT TIME ZONE 'cdt') < date_trunc('day', current_timestamp AT TIME ZONE 'cdt') -- exclude today's data, initial fill
GROUP BY 1,2,3
ORDER BY 1 DESC
ON CONFLICT(id)
DO UPDATE
SET avg_temperature = EXCLUDED.avg_temperature, min_temperature = EXCLUDED.min_temperature, max_temperature = EXCLUDED.max_temperature,
avg_humidity = EXCLUDED.avg_humidity, min_humidity = EXCLUDED.min_humidity, max_humidity = EXCLUDED.max_humidity, 
wind_speed_avg_mph = EXCLUDED.wind_speed_avg_mph, max_wind_speed_gust_mph = EXCLUDED.max_wind_speed_gust_mph, wind_speed_direction = EXCLUDED.wind_speed_direction,
num_records = EXCLUDED.num_records;

$BODY$;

Hourly Readings (aggregations)

wm\_hourly\_readings

In [None]:
/* HOURLY */

CREATE OR REPLACE FUNCTION public.wm_update_hourly_summaries(
	)
    RETURNS void
    LANGUAGE 'sql'
    COST 100
    VOLATILE STRICT PARALLEL UNSAFE
AS $BODY$

-- TEMPERATURE
INSERT INTO wm_hourly_readings (id, hour, station, avg_temperature, min_temperature, max_temperature)
SELECT
	SPLIT_PART(topic, '/', 2)||'_'||to_char(date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt'), 'YYYYMMDDHH24') as id,
	date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') as hour,
	SPLIT_PART(topic, '/', 2) as station,
	avg(text::float) AS avg_temperature,
        min(text::float) AS min_temperature,
        max(text::float) AS max_temperature
FROM journal
WHERE 
	topic = 'iot/pico/temperature_avg'
	AND text::float < 118 AND text::float > -118 -- weed out outliers/erroneous readings
	AND date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') = date_trunc('hour', current_timestamp AT TIME ZONE 'cdt' - INTERVAL '1 hour')-- previous hour
GROUP BY 1,2,3
ORDER BY 1,2
ON CONFLICT(id, hour)
DO UPDATE
SET avg_temperature = EXCLUDED.avg_temperature, min_temperature = EXCLUDED.min_temperature, max_temperature = EXCLUDED.max_temperature;

-- HUMIDITY
INSERT INTO wm_hourly_readings (id, hour, station, avg_humidity, min_humidity, max_humidity)
SELECT
	SPLIT_PART(topic, '/', 2)||'_'||to_char(date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt'), 'YYYYMMDDHH24') as id,
	date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') as hour,
	SPLIT_PART(topic, '/', 2) as station,
	avg(text::float) AS avg_humidity,
        min(text::float) AS min_humidity,
        max(text::float) AS max_humidity
FROM journal
WHERE 
	topic = 'iot/pico/humidity_avg'
	AND text::float < 118 AND text::float > -118 -- weed out outliers/erroneous readings
	AND date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') = date_trunc('hour', current_timestamp AT TIME ZONE 'cdt' - INTERVAL '1 hour')-- previous hour
GROUP BY 1,2,3 ORDER BY 1,2
ON CONFLICT(id, hour) DO UPDATE SET avg_humidity = EXCLUDED.avg_humidity, min_humidity = EXCLUDED.min_humidity, max_humidity = EXCLUDED.max_humidity;

-- WIND AND RAIN
INSERT INTO wm_hourly_readings (id, hour, station, rainfall_hour, rainfall_day, wind_speed_avg_mph, wind_speed_gust_mph, wind_speed_direction)
SELECT
	SPLIT_PART(topic, '/', 2)||'_'||to_char(date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt'), 'YYYYMMDDHH24') as id,
	date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') as hour,
	SPLIT_PART(topic, '/', 2) as station,
	max((data->'rain_hr'->'measurement')::float) AS rainfall_hour,
	max((data->'rain_day'->'measurement')::float) AS rainfall_day,
	max((data->'wind'->'speed_mph')::float) AS wind_speed_avg_mph,
	max((data->'wind'->'gust_mph')::float) AS wind_speed_gust_mph,
	mode() WITHIN GROUP (ORDER BY data->'wind'->'direction') AS wind_speed_direction
FROM journal
WHERE
	topic = 'iot/pico/readings_json'
	--AND data->'wind' IS NOT null
	AND date_trunc('hour', time::timestamptz AT TIME ZONE 'cdt') = date_trunc('hour', current_timestamp AT TIME ZONE 'cdt' - INTERVAL '1 hour')-- previous hour
GROUP BY 1,2,3 ORDER BY 1,2
ON CONFLICT(id, hour) DO UPDATE SET
  rainfall_hour = EXCLUDED.rainfall_hour,
  rainfall_day = EXCLUDED.rainfall_day,
  wind_speed_avg_mph = EXCLUDED.wind_speed_avg_mph,
  wind_speed_gust_mph = EXCLUDED.wind_speed_gust_mph,
  wind_speed_direction = EXCLUDED.wind_speed_direction;

$BODY$;

In [14]:
SELECT *
FROM
    (SELECT hour,
            avg_temperature,
            avg_humidity
     FROM wm_hourly_readings
     ORDER BY hour desc
     LIMIT 100) t
ORDER BY hour asc;

hour,avg_temperature,avg_humidity
2023-07-16T11:00:00,74.8563106796115,49.197572815534
2023-07-16T12:00:00,78.4085,41.9603333333333
2023-07-16T13:00:00,85.3851485148515,33.2739273927393
2023-07-16T14:00:00,93.1568932038835,25.0480582524272
2023-07-16T15:00:00,95.1711926605505,21.6562691131498
2023-07-16T16:00:00,88.4463106796117,23.7933656957929
2023-07-16T17:00:00,80.2616964285715,27.7967261904762
2023-07-16T18:00:00,76.5760869565217,31.5010144927536
2023-07-16T19:00:00,73.5978723404255,35.4358156028369
2023-07-16T20:00:00,70.7327272727274,39.430303030303


Latest Readings

wm\_latest\_readings

In [None]:
/*
CREATE TABLE wm_latest_readings (
station varchar(100) unique primary key not null,
avg_temperature float,
avg_humidity float,
wind_speed_gust_mph float,
wind_speed_avg_mph float,
wind_speed_direction varchar(5),
rainfall_hour float,
rainfall_day float,
updated_at timestamp
);
alter table wm_latest_readings rename column ts to updated_at;
create unique index idx_wm_latest_readings_station on wm_latest_readings (station);
*/

/*
This will run and look back some amount of time (5 mins, probably), aggregate data, and post to a table

You can adjust or remove the WHERE clause to adjust how far back the query will go

You could e.g. use cron to run every 5min (remove the spaces between `* / 5`):

```
# m h  dom mon dow   command
* / 5 * * * * echo "select wm_update_latest_readings();" | psql -h localhost -U username -d database
```

A `~/.pgpass` file can be used to store the password for psql
*/

CREATE OR REPLACE FUNCTION public.wm_update_latest_readings(
	)
    RETURNS void
    LANGUAGE 'sql'
    COST 100
    VOLATILE STRICT PARALLEL UNSAFE
AS $BODY$

-- TEMPERATURE
INSERT INTO wm_latest_readings (station, avg_temperature, updated_at)
SELECT
	SPLIT_PART(topic, '/', 2) as station,
	avg(text::float) AS avg_temperature,
    current_timestamp AT TIME ZONE 'cdt' as updated_at
    -- ,current_timestamp AT TIME ZONE 'cdt' - INTERVAL '5 minute' as since
FROM journal
WHERE 
	topic = 'iot/pico/temperature_avg'
	AND text::float < 118 AND text::float > -118 -- weed out outliers/erroneous readings
	AND  time::timestamptz AT TIME ZONE 'cdt' >= current_timestamp AT TIME ZONE 'cdt' - INTERVAL '5 minute' -- last 5 minutes
GROUP BY 1
ORDER BY 1
ON CONFLICT(station)
DO UPDATE
SET avg_temperature = EXCLUDED.avg_temperature, updated_at = EXCLUDED.updated_at;

-- HUMIDITY
INSERT INTO wm_latest_readings (station, avg_humidity, updated_at)
SELECT
	SPLIT_PART(topic, '/', 2) as station,
	avg(text::float) AS avg_humidity,
    current_timestamp AT TIME ZONE 'cdt' as updated_at
    -- ,current_timestamp AT TIME ZONE 'cdt' - INTERVAL '5 minute' as since
FROM journal
WHERE 
	topic = 'iot/pico/humidity_avg'
	AND text::float < 118 AND text::float > -118 -- weed out outliers/erroneous readings
	AND  time::timestamptz AT TIME ZONE 'cdt' >= current_timestamp AT TIME ZONE 'cdt' - INTERVAL '5 minute' -- last 5 minutes
GROUP BY 1
ORDER BY 1
ON CONFLICT(station)
DO UPDATE
SET avg_humidity = EXCLUDED.avg_humidity, updated_at = EXCLUDED.updated_at;

-- WIND AND RAIN
INSERT INTO wm_latest_readings (station, rainfall_hour, rainfall_day, wind_speed_avg_mph, wind_speed_gust_mph, wind_speed_direction, updated_at)
SELECT
	SPLIT_PART(topic, '/', 2) as station,
	max((data->'rain_hr'->'measurement')::float) AS rainfall_hour,
	max((data->'rain_day'->'measurement')::float) AS rainfall_day,
	max((data->'wind'->'speed_mph')::float) AS wind_speed_avg_mph,
	max((data->'wind'->'gust_mph')::float) AS wind_speed_gust_mph,
	mode() WITHIN GROUP (ORDER BY data->'wind'->'direction') AS wind_speed_direction,
    current_timestamp AT TIME ZONE 'cdt' as updated_at
FROM journal
WHERE
	topic = 'iot/pico/readings_json'
	--AND data->'wind' IS NOT null
	AND  time::timestamptz AT TIME ZONE 'cdt' >= current_timestamp AT TIME ZONE 'cdt' - INTERVAL '5 minute' -- last 5 minutes
GROUP BY 1 ORDER BY 1
ON CONFLICT(station) DO UPDATE SET
  rainfall_hour = EXCLUDED.rainfall_hour,
  rainfall_day = EXCLUDED.rainfall_day,
  wind_speed_avg_mph = EXCLUDED.wind_speed_avg_mph,
  wind_speed_gust_mph = EXCLUDED.wind_speed_gust_mph,
  wind_speed_direction = EXCLUDED.wind_speed_direction,
  updated_at = EXCLUDED.updated_at;

$BODY$;


SELECT wm_update_latest_readings();

In [2]:
SELECT time,text::float as rain_in_hr FROM journal
WHERE topic='iot/pico/rain_in_hr'
AND date_trunc('day', time at time zone 'cdt') = date_trunc('day', '2023-07-20'::timestamptz at time zone 'cst')
ORDER BY time asc LIMIT 500

time,rain_in_hr
2023-07-19T06:00:00.658103+01:00,0.0
2023-07-19T06:00:10.677381+01:00,0.0
2023-07-19T06:00:20.701182+01:00,0.0
2023-07-19T06:00:30.704640+01:00,0.0
2023-07-19T06:00:40.725596+01:00,0.0
2023-07-19T06:00:50.741452+01:00,0.0
2023-07-19T06:02:00.859829+01:00,0.0
2023-07-19T06:02:10.882351+01:00,0.0
2023-07-19T06:02:20.907193+01:00,0.0
2023-07-19T06:02:30.929919+01:00,0.0
