<a href="https://colab.research.google.com/github/ucguate/climahub-etl/blob/panama/ETL_climahub_PA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
# Psycopg2 module installation
!pip install psycopg2
!pip install requests



In [229]:
# importing psycopg2 
import psycopg2 
from psycopg2 import Error
from pprint import pprint
# import requests for rest requests 
import requests
# importing json module
import json
# importing pandas to manage Dictionaries as pandas DataFrames
import pandas as pd
import re 

In [17]:
# manipulate data functions
def get_metar(zoom = 18, filter = 'prior', density = 0, taf = 'false', bbox = '-104.94370625,1.7950665583439,-60.998393750001,28.156120443236'):
  queryParams = {
      'zoom': zoom,
      'filter': filter,
      'density': density,
      'taf': taf,
      'bbox': bbox
  }
  response = requests.get("https://927d1d30.us-south.apigw.appdomain.cloud/hidromet", params=queryParams)
  #print(response.json)
  data = json.loads(response.text)
  data = data["sensores"]
  #pprint(data)

  #getting column names
  #variables = data["features"][1].keys()
  #print(variables)

  df = pd.DataFrame(data)
  #display(df)
  return df;

# DATABASE CONNECTION and Insert Queries


In [208]:
# creating querying functions
def show_tables(cursor):
    query = """
      SELECT *
      FROM pg_catalog.pg_tables
      WHERE schemaname != 'pg_catalog' AND 
          schemaname != 'information_schema';
    """
    cursor.execute(query)
    pprint(cursor.fetchall())

def get_table_cols(table_name):
    query = """
    SELECT column_name
      FROM information_schema.columns
        WHERE table_schema = 'public'
          AND table_name   = '{}'
     ;
    """.format(table_name)

    cursor.execute(query)
    data = cursor.fetchall()
    pprint(data)
  
def station_columns(): 
  query2 = """
    SELECT column_name
      FROM information_schema.columns
        WHERE table_schema = 'public'
          AND table_name   = '{}'
     ;
    """.format('station')
  cursor.execute(query2)
  pprint(cursor.fetchall())

def insert_station(station_name, latitude, longitude, source_station_id, original_id):
  source_id = 2 # ETESA - HIDROMET PA
  query = """
    insert into station (source_id,station_name,latitude,longitude, source_station_id, original_id) 
      values ( '{}', '{}', '{}', '{}', '{}', '{}') on conflict do nothing;
    """.format(source_id, station_name, latitude, longitude, source_station_id, original_id)
  return query
  #cursor.execute(query)
  #pprint(cursor.fetchall())

def insert_data_all(variable_id, data_date, data_value, source_station_id):
  query = """
    INSERT INTO data_all(variable_id,data_date,data_value, station_id)
      VALUES({},'{}','{}',{}) on conflict do nothing;
    """.format(
        variable_id, data_date, data_value,
        "(SELECT id FROM station WHERE source_station_id = '{}')".format(source_station_id)
        )
  return query


In [None]:
connection = None
try:
    # Connect to an existing database
    connection = psycopg2.connect(user="climahub",
                                  password="climahub$2021",
                                  host="138.197.98.178",
                                  port="5432",
                                  database="climahub")

    # Create a cursor to perform database operations
    cursor = connection.cursor()
    # Print PostgreSQL details
    print("🟢 CONNECTION SUCCESFUL: PostgreSQL server information")
    print(connection.get_dsn_parameters(), "\n")
    # Executing a SQL query
    cursor.execute("SELECT version();")
    # Fetch result
    record = cursor.fetchone()
    print("You are connected to - ", record, "\n")

    show_tables(cursor)

except (Exception, Error) as error:
    print("❌ Error while connecting to PostgreSQL", error)

"""
finally:
    if (connection):
        cursor.close()
        connection.close()
        print("PostgreSQL connection is closed")"""

# Data manipulation and conversion

In [71]:
dfPA = get_metar();
dfPA.set_index("codigo", inplace=True)
dfPA

Unnamed: 0_level_0,nombre,estaciones
codigo,Unnamed: 1_level_1,Unnamed: 2_level_1
DIR_VIENTO,Dirección del Viento,"{'EL_COPE': {'latitud': '8.6239', 'longitud': ..."
HORA_SOL,Horas de Brillo Solar,"{'CHIMAN 2': {'latitud': '8.7181', 'longitud':..."
HR_PROM,Humedad Relativa Promedio,"{'BOQUETE': {'latitud': '8.7550', 'longitud': ..."
LLUVIA,Lluvia Acumulada 24 horas,"{'ALTO_PACORA': {'latitud': '9.2456', 'longitu..."
NIVEL,Nivel,"{'ANTON_INTERAMERICANA': {'latitud': '8.3994',..."
P_BAROM,Presión Barométrica,"{'CHIMAN 2': {'latitud': '8.7181', 'longitud':..."
RAD_SOLAR,Radiación Solar,"{'BUENA VISTA': {'latitud': '9.2772', 'longitu..."
RAFAGA,Velocidad Máxima del Viento,"{'EL_NARANJAL': {'latitud': '9.1289', 'longitu..."
TEMP_PROM,Temperatura Promedio,"{'BOQUETE': {'latitud': '8.7550', 'longitud': ..."
TEMP_SUELO,Temperatura del Suelo,"{'EL_COPE': {'latitud': '8.6239', 'longitud': ..."


In [51]:
for index, row in dfPA.iterrows():
    print(index,row['codigo'],' - ',row['nombre'])

0 DIR_VIENTO  -  Dirección del Viento
1 HORA_SOL  -  Horas de Brillo Solar
2 HR_PROM  -  Humedad Relativa Promedio
3 LLUVIA  -  Lluvia Acumulada 24 horas
4 NIVEL  -  Nivel
5 P_BAROM  -  Presión Barométrica
6 RAD_SOLAR  -  Radiación Solar
7 RAFAGA  -  Velocidad Máxima del Viento
8 TEMP_PROM  -  Temperatura Promedio
9 TEMP_SUELO  -  Temperatura del Suelo
10 VEL_VIENTO  -  Velocidad del Viento


CLIMAHUB Variables

In [198]:
variables = {
  "DIR_VIENTO" : ["WND", 7],
  "HORA_SOL" : ["HSOL", 14],
  "HR_PROM" : ["HRP", 15],
  "LLUVIA" : ["PCP	", 5],
  "NIVEL" : ["LEV", 13],
  "P_BAROM" : ["PRS", 10],
  "RAD_SOLAR" : ["RSOL", 16],
  "RAFAGA" : ["WNG", 9],
  "TEMP_PROM" : ["TMP", 3],
  "VEL_VIENTO" : ["WNS", 8]
}
variables

{'DIR_VIENTO': ['WND', 7],
 'HORA_SOL': ['HSOL', 14],
 'HR_PROM': ['HRP', 15],
 'LLUVIA': ['PCP\t', 5],
 'NIVEL': ['LEV', 13],
 'P_BAROM': ['PRS', 10],
 'RAD_SOLAR': ['RSOL', 16],
 'RAFAGA': ['WNG', 9],
 'TEMP_PROM': ['TMP', 3],
 'VEL_VIENTO': ['WNS', 8]}

In [253]:
def convert_date(date):
  #print(date)
  if(date[1:2] == '/'):
    date='0'+date
  if(date[17:19] == "AM"):
    #print("AM")
    time = date[11:16]+":00"
  else:
    #print("PM")
    hour = int(date[11:13])
    if(hour == 12):
      time = str(hour)+date[13:16]+":00"
    else:
      time = str(hour+12)+date[13:16]+":00"

  return date[6:10]+'-'+date[3:5]+'-'+date[0:2]+' '+time

def clean_valor(valor):
  valor = re.sub(r"watt/m2| |/|m/s|ºC|mbar|msnm|m|%|°", '', valor)
  return valor

# INSERT

In [None]:
# iterating recognized variables to look for into df
GRAND_QUERY = ''
for key, value in variables.items():
  print(key,value[0], value[1])
  cur_row = dfPA.loc[key].to_dict()
  print('Iterando:', cur_row["nombre"], '\n')
  
  #iterating across "estaciones"
  for k, v, in cur_row["estaciones"].items():
    print(k,v)
    GRAND_QUERY+=insert_station(v["nombre"], v["latitud"], v["longitud"], v["numero_estacion"], k)
    GRAND_QUERY+=insert_data_all(value[1], convert_date(v["sensor_fecha"]), clean_valor(v["sensor_valor"]), v["numero_estacion"])
    
  cursor.execute(GRAND_QUERY)
  connection.commit()

cursor.close()
connection.close()
print("PostgreSQL connection is closed")

'01/06/2021 08:00 AM'

CLEAN

In [5]:
# ------------------------------------
# -------------- imports -------------
# ------------------------------------
import psycopg2 
from psycopg2 import Error
from pprint import pprint 
import requests
import json
import pandas as pd
import re 

# ------------------------------------
# ------------- functions ------------
# ------------------------------------
def insert_station(station_name, latitude, longitude, source_station_id, original_id):
  source_id = 2 # ETESA - HIDROMET PA
  query = """
    insert into station (source_id,station_name,latitude,longitude, source_station_id, original_id) 
      values ( '{}', '{}', '{}', '{}', '{}', '{}') on conflict do nothing;
    """.format(source_id, station_name, latitude, longitude, source_station_id, original_id)
  return query
  #cursor.execute(query)
  #pprint(cursor.fetchall())

def insert_data_all(variable_id, data_date, data_value, source_station_id):
  query = """
    INSERT INTO data_all(variable_id,data_date,data_value, station_id)
      VALUES({},'{}','{}',{}) on conflict do nothing;
    """.format(
        variable_id, data_date, data_value,
        "(SELECT id FROM station WHERE source_station_id = '{}')".format(source_station_id)
        )
  return query
def convert_date(date):
  #print(date)
  if(date[1:2] == '/'):
    date='0'+date
  if(date[17:19] == "AM"):
    #print("AM")
    time = date[11:16]+":00"
  else:
    #print("PM")
    hour = int(date[11:13])
    if(hour == 12):
      time = str(hour)+date[13:16]+":00"
    else:
      time = str(hour+12)+date[13:16]+":00"

  return date[6:10]+'-'+date[3:5]+'-'+date[0:2]+' '+time

def clean_valor(valor):
  valor = re.sub(r"watt/m2| |/|m/s|ºC|mbar|msnm|m|%|°", '', valor)
  return valor
# manipulate data functions
def get_metar(zoom = 18, filter = 'prior', density = 0, taf = 'false', bbox = '-104.94370625,1.7950665583439,-60.998393750001,28.156120443236'):
  queryParams = {
      'zoom': zoom,
      'filter': filter,
      'density': density,
      'taf': taf,
      'bbox': bbox
  }
  response = requests.get("https://927d1d30.us-south.apigw.appdomain.cloud/hidromet", params=queryParams)
  #print(response.json)
  data = json.loads(response.text)
  data = data["sensores"]
  #pprint(data)

  #getting column names
  #variables = data["features"][1].keys()
  #print(variables)

  df = pd.DataFrame(data)
  #display(df)
  return df;

# ------------------------------------
# ------------ connection ------------
# ------------------------------------  
connection = None
try:
    # Connect to an existing database
    connection = psycopg2.connect(user="climahub",
                                  password="climahub$2021",
                                  host="138.197.98.178",
                                  port="5432",
                                  database="climahub")

    # Create a cursor to perform database operations
    cursor = connection.cursor()
    # Print PostgreSQL details
    print("🟢 CONNECTION SUCCESFUL: PostgreSQL server information")
    print(connection.get_dsn_parameters(), "\n")
    # Executing a SQL query
    cursor.execute("SELECT version();")
    # Fetch result
    record = cursor.fetchone()
    print("You are connected to - ", record, "\n")

except (Exception, Error) as error:
    print("❌ Error while connecting to PostgreSQL", error)
  
# ------------------------------------
# -------------- metar ---------------
# ------------------------------------
dfPA = get_metar();
dfPA.set_index("codigo", inplace=True)
variables = {
  "DIR_VIENTO" : ["WND", 7],
  "HORA_SOL" : ["HSOL", 14],
  "HR_PROM" : ["HRP", 15],
  "LLUVIA" : ["PCP	", 5],
  "NIVEL" : ["LEV", 13],
  "P_BAROM" : ["PRS", 10],
  "RAD_SOLAR" : ["RSOL", 16],
  "RAFAGA" : ["WNG", 9],
  "TEMP_PROM" : ["TMP", 3],
  "VEL_VIENTO" : ["WNS", 8]
}
# iterating recognized variables to look for into df
GRAND_QUERY = ''
for key, value in variables.items():
  #print(key,value[0], value[1])
  cur_row = dfPA.loc[key].to_dict()
  #print('Iterando:', cur_row["nombre"], '\n')
  
  #iterating across "estaciones"
  for k, v, in cur_row["estaciones"].items():
    #print(k,v)
    GRAND_QUERY+=insert_station(v["nombre"], v["latitud"], v["longitud"], v["numero_estacion"], k)
    GRAND_QUERY+=insert_data_all(value[1], convert_date(v["sensor_fecha"]), clean_valor(v["sensor_valor"]), v["numero_estacion"])
    
  cursor.execute(GRAND_QUERY)
  connection.commit()

cursor.close()
connection.close()
print("PostgreSQL connection is closed")

🟢 CONNECTION SUCCESFUL: PostgreSQL server information
{'user': 'climahub', 'dbname': 'climahub', 'host': '138.197.98.178', 'port': '5432', 'tty': '', 'options': '', 'sslmode': 'prefer', 'sslcompression': '0', 'krbsrvname': 'postgres', 'target_session_attrs': 'any'} 

You are connected to -  ('PostgreSQL 13.2 (Debian 13.2-1.pgdg100+1) on x86_64-pc-linux-gnu, compiled by gcc (Debian 8.3.0-6) 8.3.0, 64-bit',) 

PostgreSQL connection is closed
