In [7]:
import httpx
import os
from dotenv import load_dotenv
import pandas as pd

In [8]:
load_dotenv()
api_key= os.getenv("api_key")

In [9]:
headers = {
    "X-API-Key":api_key,
    "accept": "application/json"
}
session = httpx.AsyncClient(headers=headers)

In [7]:
url = "https://api.openaq.org/v3/locations?order_by=id&sort_order=asc&countries_id=135&limit=100&page=1"

In [8]:
response = await session.get(url=url)
result = response.json()

In [9]:
result

{'meta': {'name': 'openaq-api',
  'website': '/',
  'page': 1,
  'limit': 100,
  'found': 96},
 'results': [{'id': 31,
   'name': 'Wekerom-Riemterdijk',
   'locality': 'Wekerom',
   'timezone': 'Europe/Amsterdam',
   'country': {'id': 135, 'code': 'NL', 'name': 'Netherlands'},
   'owner': {'id': 4, 'name': 'Unknown Governmental Organization'},
   'provider': {'id': 70, 'name': 'EEA'},
   'isMobile': False,
   'isMonitor': True,
   'instruments': [{'id': 2, 'name': 'Government Monitor'}],
   'sensors': [{'id': 4650,
     'name': 'pm10 µg/m³',
     'parameter': {'id': 1,
      'name': 'pm10',
      'units': 'µg/m³',
      'displayName': 'PM10'}},
    {'id': 4275693,
     'name': 'no µg/m³',
     'parameter': {'id': 19843,
      'name': 'no',
      'units': 'µg/m³',
      'displayName': 'NO mass'}},
    {'id': 4642,
     'name': 'no2 µg/m³',
     'parameter': {'id': 5,
      'name': 'no2',
      'units': 'µg/m³',
      'displayName': 'NO₂ mass'}},
    {'id': 50,
     'name': 'pm25 µg/m³',

In [10]:
data = result["results"]

In [11]:
df = pd.DataFrame(data)

In [12]:
df = df[df['locality']=='AMSTERDAM']

In [129]:
# number of location in amsterdam 
df['id'].nunique()

13

In [110]:
print(df.head())

   locationId                    location parameter  value  \
0          80  Amsterdam-Van Diemenstraat      pm10   26.7   
1          80  Amsterdam-Van Diemenstraat      pm10   24.5   
2          80  Amsterdam-Van Diemenstraat      pm10   13.1   
3          80  Amsterdam-Van Diemenstraat      pm10   12.3   
4          80  Amsterdam-Van Diemenstraat      pm10   10.4   

                                                                         date  \
0  {'utc': '2024-04-22T08:00:00+00:00', 'local': '2024-04-22T10:00:00+02:00'}   
1  {'utc': '2024-04-22T07:00:00+00:00', 'local': '2024-04-22T09:00:00+02:00'}   
2  {'utc': '2024-04-22T06:00:00+00:00', 'local': '2024-04-22T08:00:00+02:00'}   
3  {'utc': '2024-04-22T05:00:00+00:00', 'local': '2024-04-22T07:00:00+02:00'}   
4  {'utc': '2024-04-22T04:00:00+00:00', 'local': '2024-04-22T06:00:00+02:00'}   

    unit                                                      coordinates  \
0  µg/m³  {'latitude': 52.389982999640004, 'longitude': 4.88781

In [11]:
column_names = df.columns
print(column_names)

Index(['id', 'name', 'locality', 'timezone', 'country', 'owner', 'provider',
       'isMobile', 'isMonitor', 'instruments', 'sensors', 'coordinates',
       'licenses', 'bounds', 'distance', 'datetimeFirst', 'datetimeLast'],
      dtype='object')


In [26]:
#select
turncated_df = df[['name','locality','sensors']]

In [27]:
print(turncated_df)

                                    name   locality  \
12            Amsterdam-Van Diemenstraat  AMSTERDAM   
16                  Amsterdam-Vondelpark  AMSTERDAM   
19                  Amsterdam-Westerpark  AMSTERDAM   
21                     Amsterdam-Hoogtij  AMSTERDAM   
22                 Amsterdam-Einsteinweg  AMSTERDAM   
35                 Amsterdam-Spaarnwoude  AMSTERDAM   
39               Amsterdam-Haarlemmerweg  AMSTERDAM   
41                 Amsterdam-Oude Schans  AMSTERDAM   
52           Amsterdam-Nieuwendammerdijk  AMSTERDAM   
53      Amsterdam-Kantershof (Zuid Oost)  AMSTERDAM   
59         Amsterdam-Jan van Galenstraat  AMSTERDAM   
62  Amsterdam-Sportpark Ookmeer (Osdorp)  AMSTERDAM   
65             Amsterdam-Stadhouderskade  AMSTERDAM   

                                                                                                                                                                                                    sensors  
12  [{'id': 4244, 'name

In [18]:
pd.set_option('display.max_colwidth', 200)
sensors = turncated_df[['sensors']]
print(sensors)

                                                                                                                                                                                                    sensors
12  [{'id': 4244, 'name': 'no2 µg/m³', 'parameter': {'id': 5, 'name': 'no2', 'units': 'µg/m³', 'displayName': 'NO₂ mass'}}, {'id': 4235, 'name': 'pm25 µg/m³', 'parameter': {'id': 2, 'name': 'pm25', 'u...
16  [{'id': 4275709, 'name': 'no µg/m³', 'parameter': {'id': 19843, 'name': 'no', 'units': 'µg/m³', 'displayName': 'NO mass'}}, {'id': 4461, 'name': 'o3 µg/m³', 'parameter': {'id': 3, 'name': 'o3', 'u...
19  [{'id': 4344, 'name': 'pm25 µg/m³', 'parameter': {'id': 2, 'name': 'pm25', 'units': 'µg/m³', 'displayName': 'PM2.5'}}, {'id': 160, 'name': 'so2 µg/m³', 'parameter': {'id': 6, 'name': 'so2', 'units...
21  [{'id': 4112, 'name': 'pm10 µg/m³', 'parameter': {'id': 1, 'name': 'pm10', 'units': 'µg/m³', 'displayName': 'PM10'}}, {'id': 163, 'name': 'so2 µg/m³', 'parameter': {'id': 6, 'name'

# Get measurements with sensor id

In [13]:
url = 'https://api.openaq.org/v3/sensors/4244/measurements?period_name=hour&date_from=2024-04-21T10%3A00%3A00%2B00%3A00&limit=100&page=1'
response = await session.get(url=url)
print(response)
result = response.json()
print(result['results'])

<Response [200 OK]>
[{'value': 2.2, 'parameter': {'id': 5, 'name': 'no2', 'units': 'µg/m³', 'displayName': None}, 'period': {'label': '1hour', 'interval': '01:00:00', 'datetimeFrom': {'utc': '2024-04-21T10:00:00+00:00', 'local': '2024-04-21T12:00:00+02:00'}, 'datetimeTo': {'utc': '2024-04-21T11:00:00+00:00', 'local': '2024-04-21T13:00:00+02:00'}}, 'coordinates': None, 'summary': {'min': 2.2, 'q02': 2.2, 'q25': 2.2, 'median': 2.2, 'q75': 2.2, 'q98': 2.2, 'max': 2.2, 'avg': None, 'sd': None}, 'coverage': {'expectedCount': 1, 'expectedInterval': '01:00:00', 'observedCount': 1, 'observedInterval': '01:00:00', 'percentComplete': 100.0, 'percentCoverage': 100.0, 'datetimeFrom': {'utc': '2024-04-21T11:00:00+00:00', 'local': '2024-04-21T13:00:00+02:00'}, 'datetimeTo': {'utc': '2024-04-21T11:00:00+00:00', 'local': '2024-04-21T13:00:00+02:00'}}}, {'value': 3.2, 'parameter': {'id': 5, 'name': 'no2', 'units': 'µg/m³', 'displayName': None}, 'period': {'label': '1hour', 'interval': '01:00:00', 'date

In [14]:
from datetime import datetime, timedelta

# Get the current date and time
current_datetime = datetime.now()

# Calculate the datetime 24 hours ago
datetime_24_hours_ago = current_datetime - timedelta(hours=24)

# Print the result
print("24 hours ago was:", datetime_24_hours_ago)

datetime_str = datetime_24_hours_ago.strftime('%Y-%m-%dT%H:%M:%S%z')

print(datetime_str)

datetime_str_encoded = datetime_str.replace(':', '%3A').replace('+', '%2B')

print(datetime_str_encoded)

24 hours ago was: 2024-04-21 18:48:20.168772
2024-04-21T18:48:20
2024-04-21T18%3A48%3A20


In [15]:
async def getLastDayMeasurements(sensorId, session):
    current_datetime = datetime.now()
    datetime_24_hours_ago = current_datetime - timedelta(hours=24)
    # Convert datetime object to string with timezone offset
    datetime_str = datetime_24_hours_ago.strftime('%Y-%m-%dT%H:%M:%S%z')
    # Replace characters with URL-encoded equivalents
    datetime_str_encoded = datetime_str.replace(':', '%3A').replace('+', '%2B')
    url = f'https://api.openaq.org/v3/sensors/{sensorId}/measurements?period_name=hour&date_from={datetime_str_encoded}&limit=100&page=1'
    # print(url)
    response = await session.get(url=url,timeout=10)
    if response.status_code == 500:
        print(f'Internal error for {sensorId}')
    result = response.json()
    return result['results']

In [16]:
res = await getLastDayMeasurements(sensorId=4461,session=session)
res

[{'value': 25.0,
  'parameter': {'id': 3, 'name': 'o3', 'units': 'µg/m³', 'displayName': None},
  'period': {'label': '1hour',
   'interval': '01:00:00',
   'datetimeFrom': {'utc': '2024-04-22T06:00:00+00:00',
    'local': '2024-04-22T08:00:00+02:00'},
   'datetimeTo': {'utc': '2024-04-22T07:00:00+00:00',
    'local': '2024-04-22T09:00:00+02:00'}},
  'coordinates': None,
  'summary': {'min': 25.3,
   'q02': 25.3,
   'q25': 25.3,
   'median': 25.3,
   'q75': 25.3,
   'q98': 25.3,
   'max': 25.3,
   'avg': None,
   'sd': None},
  'coverage': {'expectedCount': 1,
   'expectedInterval': '01:00:00',
   'observedCount': 1,
   'observedInterval': '01:00:00',
   'percentComplete': 100.0,
   'percentCoverage': 100.0,
   'datetimeFrom': {'utc': '2024-04-22T07:00:00+00:00',
    'local': '2024-04-22T09:00:00+02:00'},
   'datetimeTo': {'utc': '2024-04-22T07:00:00+00:00',
    'local': '2024-04-22T09:00:00+02:00'}}},
 {'value': 11.0,
  'parameter': {'id': 3, 'name': 'o3', 'units': 'µg/m³', 'displayNa

In [17]:
def getAvgSensor(measurements):
    sum = 0
    name = measurements[0]['parameter']['name']
    unit = measurements[0]['parameter']['units']
    for measurement in measurements:
        sum += measurement['value']
    avg = sum / len(measurements)
    return {"name":name,"unit":unit,"avg":avg}
    

In [18]:
getAvgSensor(res)

{'name': 'o3', 'unit': 'µg/m³', 'avg': 65.22222222222223}

In [4]:
from datetime import datetime, timedelta

In [10]:
current_datetime = datetime.now()
datetime_24_hours_ago = current_datetime - timedelta(hours=24)    
# Convert datetime object to string with timezone offset
to_datetime_str = current_datetime.strftime('%Y-%m-%dT%H:%M:%S%z')
before_datetime_str = datetime_24_hours_ago.strftime('%Y-%m-%dT%H:%M:%S%z')
# Replace characters with URL-encoded equivalents
to_datetime_str_encoded = to_datetime_str.replace(':', '%3A').replace('+', '%2B')
before_datetime_str_encoded = before_datetime_str.replace(':', '%3A').replace('+', '%2B')

city = "AMSTERDAM"

In [11]:
url = f'https://api.openaq.org/v2/measurements?date_from={before_datetime_str_encoded}&date_to={to_datetime_str_encoded}&limit=1000&page=1&offset=0&sort=desc&radius=1000&city={city}&order_by=datetime'
response = await session.get(url=url)
result = response.json()
result = result['results']
len(result)

399

In [12]:
df = pd.DataFrame(result)

In [13]:
dimenstraat_df = df[df["locationId"]==80]
dimenstraat_df.head()

Unnamed: 0,locationId,location,parameter,value,date,unit,coordinates,country,city,isMobile,isAnalysis,entity,sensorType
0,80,Amsterdam-Van Diemenstraat,pm10,8.6,"{'utc': '2024-04-23T01:00:00+00:00', 'local': ...",µg/m³,"{'latitude': 52.38998299964, 'longitude': 4.88...",NL,,False,,Governmental Organization,reference grade
1,80,Amsterdam-Van Diemenstraat,pm10,5.2,"{'utc': '2024-04-23T00:00:00+00:00', 'local': ...",µg/m³,"{'latitude': 52.38998299964, 'longitude': 4.88...",NL,,False,,Governmental Organization,reference grade
2,80,Amsterdam-Van Diemenstraat,pm10,6.1,"{'utc': '2024-04-22T23:00:00+00:00', 'local': ...",µg/m³,"{'latitude': 52.38998299964, 'longitude': 4.88...",NL,,False,,Governmental Organization,reference grade
3,80,Amsterdam-Van Diemenstraat,pm10,9.6,"{'utc': '2024-04-22T22:00:00+00:00', 'local': ...",µg/m³,"{'latitude': 52.38998299964, 'longitude': 4.88...",NL,,False,,Governmental Organization,reference grade
4,80,Amsterdam-Van Diemenstraat,pm10,5.7,"{'utc': '2024-04-22T21:00:00+00:00', 'local': ...",µg/m³,"{'latitude': 52.38998299964, 'longitude': 4.88...",NL,,False,,Governmental Organization,reference grade


In [14]:
grouped_data = df.groupby(['locationId','location', 'parameter','unit'])['value'].mean().reset_index()

grouped_data

Unnamed: 0,locationId,location,parameter,unit,value
0,80,Amsterdam-Van Diemenstraat,no2,µg/m³,9.192308
1,80,Amsterdam-Van Diemenstraat,o3,µg/m³,90.530769
2,80,Amsterdam-Van Diemenstraat,pm10,µg/m³,6.75
3,80,Amsterdam-Van Diemenstraat,pm25,µg/m³,2.884615
4,95,Amsterdam-Vondelpark,no2,µg/m³,10.553846
5,95,Amsterdam-Vondelpark,o3,µg/m³,93.15
6,95,Amsterdam-Vondelpark,pm10,µg/m³,10.408333
7,95,Amsterdam-Vondelpark,pm25,µg/m³,6.2
8,98,Amsterdam-Westerpark,pm10,µg/m³,10.958333
9,98,Amsterdam-Westerpark,pm25,µg/m³,3.423077


In [15]:
avg_amsterdam = grouped_data.groupby(['parameter','unit'])['value'].mean().reset_index()

In [16]:
avg_amsterdam['locationId'] = 0
avg_amsterdam['location'] = "Amsterdam-General"
avg_amsterdam


Unnamed: 0,parameter,unit,value,locationId,location
0,no2,µg/m³,13.978205,0,Amsterdam-General
1,o3,µg/m³,90.091026,0,Amsterdam-General
2,pm10,µg/m³,8.252083,0,Amsterdam-General
3,pm25,µg/m³,4.504396,0,Amsterdam-General
4,so2,µg/m³,1.353846,0,Amsterdam-General


In [17]:
grouped_data = pd.concat([grouped_data,avg_amsterdam],ignore_index=True)
grouped_data

Unnamed: 0,locationId,location,parameter,unit,value
0,80,Amsterdam-Van Diemenstraat,no2,µg/m³,9.192308
1,80,Amsterdam-Van Diemenstraat,o3,µg/m³,90.530769
2,80,Amsterdam-Van Diemenstraat,pm10,µg/m³,6.75
3,80,Amsterdam-Van Diemenstraat,pm25,µg/m³,2.884615
4,95,Amsterdam-Vondelpark,no2,µg/m³,10.553846
5,95,Amsterdam-Vondelpark,o3,µg/m³,93.15
6,95,Amsterdam-Vondelpark,pm10,µg/m³,10.408333
7,95,Amsterdam-Vondelpark,pm25,µg/m³,6.2
8,98,Amsterdam-Westerpark,pm10,µg/m³,10.958333
9,98,Amsterdam-Westerpark,pm25,µg/m³,3.423077


In [18]:
# Location names list amsterdam
location_names = grouped_data['location'].unique()

for location_name in location_names:
    print(location_name)

Amsterdam-Van Diemenstraat
Amsterdam-Vondelpark
Amsterdam-Westerpark
Amsterdam-Hoogtij
Amsterdam-Einsteinweg
Amsterdam-Spaarnwoude
Amsterdam-Haarlemmerweg
Amsterdam-Oude Schans
Amsterdam-Nieuwendammerdijk
Amsterdam-Kantershof (Zuid Oost)
Amsterdam-Jan van Galenstraat
Amsterdam-Sportpark Ookmeer (Osdorp)
Amsterdam-Stadhouderskade
Amsterdam-General


# InfluxDB

In [19]:
from dotenv import load_dotenv
import influxdb_client, os ,time
from influxdb_client import InfluxDBClient, Point, WritePrecision, DeletePredicateRequest
from influxdb_client.client.write_api import SYNCHRONOUS

load_dotenv()

token = os.getenv("INFLUXDB_TOKEN")
org = os.getenv("INFLUXDB_ORG")
url = os.getenv("INFLUXDB_URL")

client = influxdb_client.InfluxDBClient(url=url, token=token, org=org)

print(f'{token} {org} {url}')

v5rg1Z1DD8jBqeaIo9ITKsleRdg-SHPAe88yqyRgr5WcUTHhiqw0qDoxpsVcKm8CFRoLO8gul2_aZrd75uoBLg== ORG_NAME http://localhost:8086


add data

In [20]:


bucket="Demo_Data_Pipeline"

write_api = client.write_api(write_options=SYNCHRONOUS)
   
for index,row in grouped_data.iterrows():
  point = (
    Point("amsterdam_measurement")
    .tag("location", row['location'])
    .tag("unit",row['unit'])
    .field(row['parameter'],row['value'])
  )
  write_api.write(bucket=bucket, org="ORG_NAME", record=point)


read data

In [21]:
query_api = client.query_api()

query = """from(bucket: "Demo_Data_Pipeline")
 |> range(start: -10m)
 |> filter(fn: (r) => r._measurement == "amsterdam_measurement")"""
tables = query_api.query(query, org="ORG_NAME")

for table in tables:
  for record in table.records:
    print(record)


FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': datetime.datetime(2024, 4, 23, 11, 22, 34, 375667, tzinfo=tzutc()), '_stop': datetime.datetime(2024, 4, 23, 11, 32, 34, 375667, tzinfo=tzutc()), '_time': datetime.datetime(2024, 4, 23, 11, 32, 27, 874382, tzinfo=tzutc()), '_value': 38.94615384615385, '_field': 'no2', '_measurement': 'amsterdam_measurement', 'location': 'Amsterdam-Einsteinweg', 'unit': 'µg/m³'}
FluxRecord() table: 1, {'result': '_result', 'table': 1, '_start': datetime.datetime(2024, 4, 23, 11, 22, 34, 375667, tzinfo=tzutc()), '_stop': datetime.datetime(2024, 4, 23, 11, 32, 34, 375667, tzinfo=tzutc()), '_time': datetime.datetime(2024, 4, 23, 11, 32, 27, 877516, tzinfo=tzutc()), '_value': 11.483333333333334, '_field': 'pm10', '_measurement': 'amsterdam_measurement', 'location': 'Amsterdam-Einsteinweg', 'unit': 'µg/m³'}
FluxRecord() table: 2, {'result': '_result', 'table': 2, '_start': datetime.datetime(2024, 4, 23, 11, 22, 34, 375667, tzinfo=tzutc()), '_s

aggregate data

In [67]:
query_api = client.query_api()

query = """from(bucket: "Demo_Data_Pipeline")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "amsterdam_measurement")
  |> mean()"""
tables = query_api.query(query, org="ORG_NAME")

for table in tables:
    for record in table.records:
        print(record)


FluxRecord() table: 0, {'result': '_result', 'table': 0, '_start': datetime.datetime(2024, 4, 22, 9, 23, 58, 684730, tzinfo=tzutc()), '_stop': datetime.datetime(2024, 4, 23, 9, 23, 58, 684730, tzinfo=tzutc()), '_value': 40.775, '_field': 'no2', '_measurement': 'amsterdam_measurement', 'location': 'Amsterdam-Einsteinweg', 'unit': 'µg/m³'}
FluxRecord() table: 1, {'result': '_result', 'table': 1, '_start': datetime.datetime(2024, 4, 22, 9, 23, 58, 684730, tzinfo=tzutc()), '_stop': datetime.datetime(2024, 4, 23, 9, 23, 58, 684730, tzinfo=tzutc()), '_value': 13.375155279503105, '_field': 'pm10', '_measurement': 'amsterdam_measurement', 'location': 'Amsterdam-Einsteinweg', 'unit': 'µg/m³'}
FluxRecord() table: 2, {'result': '_result', 'table': 2, '_start': datetime.datetime(2024, 4, 22, 9, 23, 58, 684730, tzinfo=tzutc()), '_stop': datetime.datetime(2024, 4, 23, 9, 23, 58, 684730, tzinfo=tzutc()), '_value': 7.437101449275363, '_field': 'pm25', '_measurement': 'amsterdam_measurement', 'location

delete data

In [None]:
# Specify the start and end time for the data deletion
start_time = datetime(2024, 4, 22, 14, 55, 0)  # Replace with your start time
end_time = datetime(2024, 4, 22, 15, 5, 0)    # Replace with your end time

# Create a DeletePredicateRequest to specify the predicate for data deletion
delete_request = DeletePredicateRequest()
delete_request.start = start_time
delete_request.stop = end_time
delete_request.predicate = '_measurement="' + "measurement1" + '"'

client.delete_api