In [1]:
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from dotenv import dotenv_values
config = dotenv_values(".env")

# Replace these values with your database connection details
db_params = {
    'host': config["IN_HOST"],
    'database': config["IN_DB"],
    'user': config["IN_USER"],
    'password': config["IN_PWD"],
    'port': config["IN_PORT"]
}

# Construct the connection string
conn_str = "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**db_params)
print(conn_str)

# Create a SQLAlchemy engine
engine = create_engine(conn_str)

In [2]:
# SQL query to select all data from the table
sql_query = f"""
select distinct
	f."name" as field, s."name" as scenario, s.description as scenario_description, s.scenario_arpae_name as arpae, s.scenario_water_name as watering, 
	d.unix_timestamp as sensor_timestamp, d.value_type_name as sensor_type, d.x as sensor_x, d.y as sensor_y, d.z as sensor_z, d.value as sensor_value
from synthetic_data d, synthetic_field f, synthetic_scenario s
where d.field_name = f."name" and d.scenario_name = s."name"
"""

# Use pandas to read the query result into a DataFrame
df = pd.read_sql(sql_query, engine)

# Now 'df' contains the data from the PostgreSQL table
df.to_csv('data/sensor_ft.csv', index=False)
df

Unnamed: 0,field,scenario,scenario_description,arpae,watering,sensor_timestamp,sensor_type,sensor_x,sensor_y,sensor_z,sensor_value
0,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655809200,GROUND_WATER_POTENTIAL,0.00,0.00,-0.60,-168.330
1,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655809200,GROUND_WATER_POTENTIAL,0.00,0.00,-0.40,-22.770
2,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655809200,GROUND_WATER_POTENTIAL,0.00,0.00,-0.20,-22.480
3,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655809200,GROUND_WATER_POTENTIAL,0.25,0.00,-0.60,-186.310
4,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655809200,GROUND_WATER_POTENTIAL,0.25,0.00,-0.40,-23.730
...,...,...,...,...,...,...,...,...,...,...,...
14329383,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601553600,GROUND_WATER_POTENTIAL,0.95,0.05,-0.45,-41.306
14329384,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601553600,GROUND_WATER_POTENTIAL,0.95,0.05,-0.35,-36.793
14329385,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601553600,GROUND_WATER_POTENTIAL,0.95,0.05,-0.25,-34.385
14329386,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601553600,GROUND_WATER_POTENTIAL,0.95,0.05,-0.15,-33.316


In [3]:
# SQL query to select all data from the table
sql_query = f"""
select distinct
	sf.field_name as field, s."name" as scenario, s.description as scenario_description, s.scenario_arpae_name as arpae, s.scenario_water_name as watering,
	da.unix_timestamp as arpae_timestamp, da.value_type_name as arpae_type, da.value as arpae_value
from synthetic_field_scenario sf, synthetic_scenario s, synthetic_scenario_arpae_data da
where sf.scenario_name = s."name" and s.scenario_arpae_name = da.scenario_arpae_name
"""

# Use pandas to read the query result into a DataFrame
df = pd.read_sql(sql_query, engine)

# Now 'df' contains the data from the PostgreSQL table
df.to_csv('data/weather_ft.csv', index=False)
df

Unnamed: 0,field,scenario,scenario_description,arpae,watering,arpae_timestamp,arpae_type,arpae_value
0,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,AIR_HUMIDITY,88.850
1,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,AIR_TEMPERATURE,17.585
2,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,PRECIPITATIONS,0.000
3,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,RADIATIONS,0.000
4,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,WIND_SPEED,0.900
...,...,...,...,...,...,...,...,...
2040629,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601596800,AIR_HUMIDITY,91.000
2040630,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601596800,AIR_TEMPERATURE,15.400
2040631,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601596800,PRECIPITATIONS,0.000
2040632,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601596800,RADIATIONS,-1.000


In [4]:
# SQL query to select all data from the table
sql_query = f"""
select distinct
	sf.field_name as field, s."name" as scenario, s.description as scenario_description, s.scenario_arpae_name as arpae, s.scenario_water_name as watering, 
	dw.unix_timestamp as water_timestamp, dw.value_type_name as water_type, dw.value as water_value
from synthetic_field_scenario sf, synthetic_scenario s, synthetic_scenario_water_data dw
where sf.scenario_name = s."name" and s.scenario_water_name = dw.scenario_water_name
"""

# Use pandas to read the query result into a DataFrame
df = pd.read_sql(sql_query, engine)

# Now 'df' contains the data from the PostgreSQL table
df.to_csv('data/irrigation_ft.csv', index=False)
df

Unnamed: 0,field,scenario,scenario_description,arpae,watering,water_timestamp,water_type,water_value
0,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655244000,IRRIGATIONS,0.0
1,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655247600,IRRIGATIONS,0.0
2,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655251200,IRRIGATIONS,0.0
3,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655254800,IRRIGATIONS,0.0
4,Real Fondo ERRANO,Real Fondo ERRANO 2022,Real data from 21.06.2022 to 17.08.2022,Real Fondo ERRANO 2022,Real Fondo ERRANO 2022,1655258400,IRRIGATIONS,0.0
...,...,...,...,...,...,...,...,...
404847,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601582400,IRRIGATIONS,0.0
404848,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601586000,IRRIGATIONS,0.0
404849,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601589600,IRRIGATIONS,0.0
404850,Synthetic field v.1.0,Synthetic Martorano 2020 v.1.0,Real arpae data - from 01.06.2020 to 01.10.202...,Real Martorano 2020,Synthetic Martorano 2020 v.1.0,1601593200,IRRIGATIONS,0.0


In [5]:
def read_df(file):
    sdf = pd.read_csv(file)
    sdf = sdf.rename({"field": "fieldDesc", "watering": "wateringDesc", "arpae": "arpaeDesc", "scenario": "scenarioDesc"}, axis=1)
    sdf.columns = [x.replace("sensor_", "").replace("arpae_", "").replace("water_", "") for x in sdf.columns]
    return sdf

In [6]:
import hashlib

def get_last_4_digits(data):
    # Compute the hash
    hash_object = hashlib.sha256(data.encode())
    hash_hex = hash_object.hexdigest()

    # Convert the hash to an integer and get the last 4 digits
    last_4_digits = int(hash_hex, 16) % 10000000

    return last_4_digits

In [7]:
def extend_df(sdf):
    sdf["field"] = sdf.apply(lambda x: "field-{}".format(get_last_4_digits(x["fieldDesc"] + "-" + x["scenarioDesc"] + "-" + x["arpaeDesc"])), axis=1)
    sdf['datetime'] = pd.to_datetime(sdf['timestamp'], unit='s')
    sdf['date'] = pd.to_datetime(sdf['timestamp'], unit='s').dt.date
    sdf['month'] = pd.to_datetime(sdf['timestamp'], unit='s').dt.month
    sdf['year'] = pd.to_datetime(sdf['timestamp'], unit='s').dt.year
    sdf['month'] = sdf.apply(lambda x: "{}-{}".format(x["year"], x["month"]), axis=1)
    sdf['hour'] = pd.to_datetime(sdf['timestamp'], unit='s').dt.hour
    sdf['hour'] = sdf.apply(lambda x: "{} {}:00:00".format(x["date"], x["hour"]), axis=1)
    sdf["agentType"] = sdf.apply(lambda x: "AssignedDevice", axis=1)
    sdf["timestampReceived"] = sdf["timestamp"]
    sdf["delay"] = sdf["timestampReceived"] - sdf["timestamp"] 
    sdf["province"] = "FE"
    sdf["region"] = "ER"
    sdf["country"] = "IT"
    sdf["owner"] = "Forecasting simulation"

sdf = read_df('data/sensor_ft.csv')
extend_df(sdf)
sdf["agent"] = sdf.apply(lambda x: "sensor-({},{},{})".format(x["x"], x["y"], x["z"]), axis=1)
sdf["type-ext"] = sdf.apply(lambda x: "{}-({},{},{})".format(x["type"], x["x"], x["y"], x["z"]), axis=1)
sdf.to_csv('data/sensor_enr_ft.csv', index=False)
sdf

In [None]:
wdf = read_df('data/weather_ft.csv')
extend_df(wdf)
wdf["agent"] = "WeatherStation"
wdf["type-ext"] = wdf["type"]
wdf.to_csv('data/weather_enr_ft.csv', index=False)
wdf

In [None]:
idf = read_df('data/irrigation_ft.csv')
extend_df(idf)
idf["agent"] = "Dripper"
idf["type-ext"] = idf["type"]
idf.to_csv('data/irrigation_enr_ft.csv', index=False)
idf

In [None]:
tables = {
    "ft_measurement": ["agent", "type", "field", "owner", "timestamp", "value", "delay"],
    "dt_field": ["field", "fieldDesc", "scenarioDesc", "scenario_description", "arpaeDesc", "wateringDesc", "province", "region", "country"],
    "dt_time": ["timestamp", "datetime", "hour", "date", "month", "year"],
    "dt_agent": ["agent", "agentType"],
}

columns = list(set([item for sublist in [c for c in tables.values()] for item in sublist]))
edf = pd.concat([sdf[columns], idf[columns], wdf[columns]], ignore_index=True)

In [None]:
db_name2 = config["OUT_DB"]
def connect(db_name1):
    # Connect to PostgreSQL server
    conn = psycopg2.connect(
        dbname=db_name1,
        user=config["OUT_USER"],
        password=config["OUT_PWD"],
        host=config["OUT_HOST"],
        port=config["OUT_PORT"]
    )
    conn.autocommit = True
    return conn

In [None]:
import psycopg2
from psycopg2 import sql

try: 
    conn = connect(config["IN_DB"])
    # Create a cursor
    cursor = conn.cursor()
    # Define the SQL command to create the database (if not exists)
    create_db_query = sql.SQL("CREATE DATABASE {};").format(sql.Identifier(db_name2))
    # Execute the SQL command
    cursor.execute(create_db_query)
    # Commit the changes and close the connection
    conn.commit()
    cursor.close()
    conn.close()
except:
    print("DB already exists")

db_params['database'] = db_name2
conn_str = "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**db_params)
engine = create_engine(conn_str)

def get_type(x):
    if x == "timestamp" or x == "delay":
        return "numeric"
    elif x == "value":
        return "double precision"
    else:
        return "varchar"

conn = connect(db_name2)
cursor = conn.cursor()

for tablename, columns in tables.items():
    print(tablename)
    try: 
        create_db_query = sql.SQL("DROP TABLE {};").format(sql.Identifier(tablename))
        cursor.execute(create_db_query)
        conn.commit()
    except:
        print("Table {} does not exist".format(tablename))
    
    try: 
        # create_db_query = sql.SQL("CREATE TABLE {} ({});".format(tablename, ', '.join(["{} {}".format(x, get_type(x)) for x in columns])))
        # print(create_db_query)
        # cursor.execute(create_db_query)
        # conn.commit()
        edf[columns].drop_duplicates().to_sql(tablename, engine, if_exists='replace', index=False)
    except Exception as e:
        print(e) 
        print("FAIL to create {}".format(tablename))


for statement in [
    "ALTER TABLE dt_time ADD PRIMARY KEY (timestamp);",
    "ALTER TABLE dt_field ADD PRIMARY KEY (field);",
    "ALTER TABLE dt_agent ADD PRIMARY KEY (agent);",
    "ALTER TABLE ft_measurement ADD PRIMARY KEY (timestamp, field, agent, type, owner);",
    "ALTER TABLE ft_measurement ADD FOREIGN KEY (timestamp) REFERENCES dt_time(timestamp);",
    "ALTER TABLE ft_measurement ADD FOREIGN KEY (field) REFERENCES dt_field(field);",
    "ALTER TABLE ft_measurement ADD FOREIGN KEY (agent) REFERENCES dt_agent(agent);"
    ]:
    try: 
        print(statement)
        create_db_query = sql.SQL(statement)
        cursor.execute(create_db_query)
        conn.commit()
    except Exception as e:
        print(e) 

cursor.close()
conn.close()

In [None]:
tables = {
    "ft_field_measurement": ["type-ext", "field", "owner", "timestamp", "value"]
}

columns = list(set([item for sublist in [c for c in tables.values()] for item in sublist]))
rdf = pd.concat([sdf[columns], idf[columns], wdf[columns]], ignore_index=True)
rdf = rdf.rename({"type-ext": "type"}, axis=1)
pivoted_df = rdf[["field", "timestamp", "type", "owner", "value"]].pivot(index=["field", "timestamp", "owner"], columns='type', values='value')
pivoted_df.reset_index(inplace=True)
pivoted_df = pivoted_df.dropna(axis=1, how='all').fillna(method='ffill').fillna(method='bfill').dropna()


In [None]:
conn = connect(db_name2)
cursor = conn.cursor()

db_params['database'] = db_name2
conn_str = "postgresql+psycopg2://{user}:{password}@{host}:{port}/{database}".format(**db_params)
engine = create_engine(conn_str)

tablename = "ft_field_measurement"
try: 
    create_db_query = sql.SQL("DROP TABLE {};").format(sql.Identifier(tablename))
    cursor.execute(create_db_query)
    conn.commit()
except Exception as e:
    print(e)

try: 
    pivoted_df.drop_duplicates().to_sql(tablename, engine, if_exists='replace', index=False)
except Exception as e:
    print(e) 
    print("FAIL to create {}".format(tablename))


for statement in [
    "ALTER TABLE ft_field_measurement ADD PRIMARY KEY (timestamp, field, owner);",
    "ALTER TABLE ft_field_measurement ADD FOREIGN KEY (timestamp) REFERENCES dt_time(timestamp);",
    "ALTER TABLE ft_field_measurement ADD FOREIGN KEY (field) REFERENCES dt_field(field);",
    ]:
    try: 
        print(statement)
        create_db_query = sql.SQL(statement)
        cursor.execute(create_db_query)
        conn.commit()
    except Exception as e:
        print(e) 

cursor.close()
conn.close()
