## Setup S3 connection

In [2]:
import glob
import boto3
import yaml
from datetime import datetime
from tqdm import tqdm

with open('/home/eouser/Desktop/DEDL/credentials.yaml', 'r') as f:
    credentials = yaml.safe_load(f)


def setup_S3(site_name):
	S3_URL = f"https://s3.{site_name}.data.destination-earth.eu"
	session = boto3.Session(
			aws_access_key_id=credentials[site_name]["key"],
			aws_secret_access_key=credentials[site_name]["secret"],
			region_name=credentials[site_name]["region"],
	)
	return session.client('s3', endpoint_url=S3_URL)


## Extract datadis data

In [3]:
s3 = setup_S3("central")

def move_s3_objects(source_bucket, destination_bucket, prefix=''):
    response = s3.list_objects_v2(Bucket=source_bucket, Prefix=prefix)
    if 'Contents' not in response: return print("No files found.")

    objects = []
    bytes_data = []
    for obj in response['Contents']:
        source_key = obj['Key']
        destination_key = source_key.replace(prefix,'',1)
        objects.append(source_key)
        bytes_data.append(s3.get_object(Bucket=source_bucket, Key=source_key)['Body'].read())
        print(source_key)
        # # Copy object to the destination bucket
        # copy_source = {'Bucket': source_bucket, 'Key': source_key}
        # s3.copy_object(CopySource=copy_source, Bucket=destination_bucket, Key=destination_key)
        
        # # Delete object from the source bucket after copying
        # s3.delete_object(Bucket=source_bucket, Key=source_key)
        
        # print(f"Moved {source_key} to {destination_bucket}")
    return dict(zip(objects, bytes_data))

source_bucket = "energy-storage"
files = {}
destination_bucket = "datadis"
for x in ['Barcelona','Girona','Lleida','Tarragona']:
    prefix = f"datadis/{x}/2021-10/"
    files.update(move_s3_objects(source_bucket, destination_bucket, prefix))


datadis/Barcelona/2021-10/consumption_08001.csv
datadis/Barcelona/2021-10/consumption_08002.csv
datadis/Barcelona/2021-10/consumption_08003.csv
datadis/Barcelona/2021-10/consumption_08004.csv
datadis/Barcelona/2021-10/consumption_08005.csv
datadis/Barcelona/2021-10/consumption_08006.csv
datadis/Barcelona/2021-10/consumption_08007.csv
datadis/Barcelona/2021-10/consumption_08008.csv
datadis/Barcelona/2021-10/consumption_08009.csv
datadis/Barcelona/2021-10/consumption_08010.csv
datadis/Barcelona/2021-10/consumption_08011.csv
datadis/Barcelona/2021-10/consumption_08012.csv
datadis/Barcelona/2021-10/consumption_08013.csv
datadis/Barcelona/2021-10/consumption_08014.csv
datadis/Barcelona/2021-10/consumption_08015.csv
datadis/Barcelona/2021-10/consumption_08016.csv
datadis/Barcelona/2021-10/consumption_08017.csv
datadis/Barcelona/2021-10/consumption_08018.csv
datadis/Barcelona/2021-10/consumption_08019.csv
datadis/Barcelona/2021-10/consumption_08020.csv
datadis/Barcelona/2021-10/consumption_08

## Transform datadis data

In [14]:
import io
import polars as pl
from datetime import timedelta

sector_dfs = {}
filename, binary_content = list(files.items())[0]

file_like = io.BytesIO(binary_content)
df = pl.read_csv(file_like, infer_schema_length=0)

df = df.with_columns([pl.col(col).cast(pl.Float64) for col in df.columns[-25:]])
df = df.drop(['','community', 'province', 'municipality', 'sumEnergy'])
df

dataDay,dataMonth,dataYear,postalCode,fare,timeDiscrimination,measurePointType,sumContracts,tension,economicSector,distributor,mi1,mi2,mi3,mi4,mi5,mi6,mi7,mi8,mi9,mi10,mi11,mi12,mi13,mi14,mi15,mi16,mi17,mi18,mi19,mi20,mi21,mi22,mi23,mi24,mi25
str,str,str,str,str,str,str,str,str,str,str,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64
"""1""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,"""21""",,"""SERVICIOS""",,3411.0,3306.0,3252.0,3241.0,3230.0,3287.0,3688.0,4364.0,3480.0,3753.0,3938.0,3994.0,4027.0,3987.0,3932.0,3843.0,3890.0,4080.0,4045.0,3859.0,3360.0,3042.0,2819.0,2480.0,0.0
"""2""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,"""21""",,"""SERVICIOS""",,3441.0,3370.0,3304.0,3265.0,3249.0,3273.0,3424.0,3886.0,3981.0,4160.0,4305.0,4332.0,4341.0,4429.0,4436.0,4517.0,4845.0,4837.0,4799.0,4716.0,4558.0,4086.0,3696.0,3541.0,0.0
"""3""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,"""21""",,"""SERVICIOS""",,3371.0,3325.0,3266.0,3257.0,3245.0,3280.0,3398.0,3985.0,4013.0,4216.0,4381.0,4435.0,4442.0,4467.0,4491.0,4655.0,4839.0,4803.0,4623.0,4651.0,4315.0,3699.0,3626.0,3462.0,0.0
"""4""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,"""21""",,"""SERVICIOS""",,3297.0,3232.0,3178.0,3194.0,3190.0,3238.0,3535.0,4248.0,3368.0,3610.0,3648.0,3552.0,3529.0,3469.0,3441.0,3456.0,3516.0,3548.0,3574.0,3379.0,3141.0,2762.0,2417.0,2204.0,0.0
"""5""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,"""21""",,"""SERVICIOS""",,3179.0,3129.0,3117.0,3107.0,3077.0,3142.0,3371.0,4045.0,3184.0,3355.0,3431.0,3502.0,3661.0,3864.0,3845.0,3834.0,3789.0,3780.0,3768.0,3457.0,3007.0,2580.0,2354.0,2227.0,0.0
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""27""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,"""2266""",,"""SERVICIOS""",,997.0,872.0,812.0,789.0,782.0,781.0,831.0,961.0,1037.0,1236.0,1566.0,1724.0,1779.0,1801.0,1717.0,1642.0,1665.0,1719.0,1706.0,1868.0,1769.0,1528.0,1338.0,1162.0,0.0
"""28""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,"""2266""",,"""SERVICIOS""",,970.0,873.0,820.0,785.0,774.0,766.0,832.0,948.0,1026.0,1264.0,1567.0,1688.0,1772.0,1794.0,1711.0,1672.0,1719.0,1741.0,1775.0,1907.0,1769.0,1511.0,1335.0,1187.0,0.0
"""29""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,"""2267""",,"""SERVICIOS""",,1015.0,907.0,850.0,800.0,794.0,787.0,843.0,947.0,1029.0,1226.0,1560.0,1712.0,1783.0,1801.0,1721.0,1677.0,1690.0,1708.0,1729.0,1875.0,1772.0,1539.0,1375.0,1241.0,0.0
"""30""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,"""2267""",,"""SERVICIOS""",,1101.0,980.0,922.0,853.0,812.0,805.0,835.0,886.0,860.0,1027.0,1350.0,1543.0,1645.0,1683.0,1578.0,1556.0,1519.0,1539.0,1579.0,1745.0,1699.0,1515.0,1327.0,1188.0,0.0


In [22]:

# Melt the dataframe
df = df.melt(
    id_vars=['dataDay', 'dataMonth', 'dataYear', 'postalCode', 'fare',
            'timeDiscrimination', 'measurePointType', 'sumContracts', 
            'tension', 'economicSector', 'distributor'],
    value_vars=[f"mi{i}" for i in range(1, 25)],
    variable_name="hour_datadis",
    value_name="Consumption"
)

# Extract hour and pad with zero
df = df.with_columns([
    pl.col("sumContracts").cast(pl.Float64),
    pl.col("hour_datadis").str.extract(r"(\d+)", 1).str.zfill(2).alias("hour")
])

df = df.with_columns(
    (pl.concat_str([
        pl.col("dataYear"), pl.lit("-"),
        pl.col("dataMonth").str.zfill(2), pl.lit("-"),
        pl.col("dataDay").str.zfill(2)
    ]).str.strptime(pl.Date, "%Y-%m-%d")).alias("date")
)
df

# Mask where hour is '24' and adjust the date
mask = pl.col("hour") == '24'
df = df.with_columns([
    pl.when(mask).then(pl.lit("00")).otherwise(pl.col("hour")).alias("hour_correct"),
    pl.when(mask).then(pl.col("date") + pl.duration(days=1)).otherwise(pl.col("date")).alias("datetime")
])
df


  df = df.melt(


dataDay,dataMonth,dataYear,postalCode,fare,timeDiscrimination,measurePointType,sumContracts,tension,economicSector,distributor,hour_datadis,Consumption,hour,date,hour_correct,datetime
str,str,str,str,str,str,str,f64,str,str,str,str,f64,str,date,str,date
"""1""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,21.0,,"""SERVICIOS""",,"""mi1""",3411.0,"""01""",2021-10-01,"""01""",2021-10-01
"""2""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,21.0,,"""SERVICIOS""",,"""mi1""",3441.0,"""01""",2021-10-02,"""01""",2021-10-02
"""3""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,21.0,,"""SERVICIOS""",,"""mi1""",3371.0,"""01""",2021-10-03,"""01""",2021-10-03
"""4""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,21.0,,"""SERVICIOS""",,"""mi1""",3297.0,"""01""",2021-10-04,"""01""",2021-10-04
"""5""","""10""","""2021""","""08001""",""">= 1 kV Y < 30 kV""","""GENERAL ALTA TENSIÓN""",,21.0,,"""SERVICIOS""",,"""mi1""",3179.0,"""01""",2021-10-05,"""01""",2021-10-05
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
"""27""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,2266.0,,"""SERVICIOS""",,"""mi24""",1162.0,"""24""",2021-10-27,"""00""",2021-10-28
"""28""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,2266.0,,"""SERVICIOS""",,"""mi24""",1187.0,"""24""",2021-10-28,"""00""",2021-10-29
"""29""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,2267.0,,"""SERVICIOS""",,"""mi24""",1241.0,"""24""",2021-10-29,"""00""",2021-10-30
"""30""","""10""","""2021""","""08001""","""BAJA TENSION y POTENCIA <= 15 …","""TARIFA DE TRES PERIODOS""",,2267.0,,"""SERVICIOS""",,"""mi24""",1188.0,"""24""",2021-10-30,"""00""",2021-10-31


In [32]:
hours = ["00","01","02","03","04","05","06","07","08","09","10","11","12","13","14","15","16","17","18","19","20","21","22","23"]
hours[int("24")-2]

'22'

In [21]:
df = df.to_pandas()

df['utc_hour'] = df["hour_correct"]

In [6]:
import pandas as pd

# Sample DataFrame with naive datetime (assuming it's in CET/CEST)
df = pd.DataFrame({'datetime': pd.to_datetime(['2023-03-26 00:00:00','2023-03-26 01:00:00','2023-03-26 03:00:00','2023-03-26 04:00:00'])})

# Localize to CET/CEST and then convert to UTC
df['datetime_utc'] = df['datetime'].dt.tz_localize('Europe/Berlin').dt.tz_convert('UTC')

print(df)

             datetime              datetime_utc
0 2023-03-26 00:00:00 2023-03-25 23:00:00+00:00
1 2023-03-26 01:00:00 2023-03-26 00:00:00+00:00
2 2023-03-26 03:00:00 2023-03-26 01:00:00+00:00
3 2023-03-26 04:00:00 2023-03-26 02:00:00+00:00


In [None]:

try: 
# Create the 'time' column
    df = df.with_columns(
        (pl.concat_str([
            pl.col("datetime").dt.strftime('%Y-%m-%d'), pl.lit(" "),
            pl.col("hour_correct"), pl.lit(":00")
        ]).str.to_datetime().dt.replace_time_zone("Europe/Amsterdam",ambiguous='latest').alias("time"))
    )
except:
    if df.filter(pl.col('dataMonth') == '3').height > 0:
    # Drop rows where 'hour_correct' is '03' and 'dataMonth' is '3'
        df = df.filter(~(pl.col('hour_correct') == '03'))
    
    # # Replace 'hour_correct' '02' with '03' where 'dataMonth' is '3'
    df = df.with_columns(
    hour_correct = pl.when(pl.col('hour_correct') == '02')
    .then(pl.lit('03'))  # Replace '02' with '03'
    .otherwise(pl.col('hour_correct'))) 

    df = df.with_columns(
        (pl.concat_str([
            pl.col("datetime").dt.strftime('%Y-%m-%d'), pl.lit(" "),
            pl.col("hour_correct"), pl.lit(":00")
        ]).str.to_datetime().dt.replace_time_zone("Europe/Amsterdam",ambiguous='latest').alias("time"))
    )

df

In [None]:


# Drop unnecessary columns
df = df.drop([
    'dataDay', 'dataMonth', 'dataYear', 'fare', 'timeDiscrimination', 
    'measurePointType', 'tension', 'distributor', 'hour', 'hour_correct','datetime','date', 'hour_datadis'
])

df_grouped = df.group_by(['postalCode', 'time', 'economicSector']).agg([
    pl.sum('sumContracts'),
    pl.sum('Consumption')
])

for sector, df_group in df_grouped.group_by('economicSector'):
    df_group = df_group.drop(['economicSector'])
    if sector[0] in sector_dfs:
        sector_dfs[sector[0]] = pl.concat([sector_dfs[sector[0]], df_group])
    else:
        sector_dfs[sector[0]] = df_group

In [None]:
import psycopg2
import yaml
from tqdm import tqdm
import polars as pl
import pandas as pd
with open('/home/eouser/Desktop/DEDL/credentials.yaml', 'r') as f:
    c = yaml.safe_load(f)["postgres"]

df = sector_dfs["RESIDENCIAL"]

conn = psycopg2.connect(f"dbname={c['db_name']} user={c['db_user']} password={c['db_password']} host={c['db_host']} port={c['db_port']} sslmode=require")
cursor = conn.cursor()

dtype_map = {
    'object': 'VARCHAR',
    'int64': 'INTEGER',
    'float64': 'FLOAT',
    'datetime64[ns]': 'TIMESTAMPZ',
    'datetime64[us]': 'TIMESTAMPZ'
}

def check_table_exists(table_name):
    query = f"""
        SELECT *
        FROM information_schema.tables
        WHERE table_schema = 'public' AND table_name = '{table_name}'
    """
    cursor.execute(query)
    return cursor.fetchone()[0] 
    
check_table_exists("residential_consumption")


In [None]:
import pandas as pd
import io
import polars as pl
from datetime import timedelta

filename, binary_content = list(files.items())[242]
print(filename)
file_like = io.BytesIO(binary_content)
df = pd.read_csv(file_like, index_col = False, dtype=str)
print(df)
print(df.dtypes)
for col in df.columns[-25:]:  # Adjust the range based on your actual columns
    df[col] = df[col].astype(float)
# for col in df.columns[:4]:  # Adjust the range based on your actual columns
#     df[col] = df[col].astype(int)
print(df.columns)
print(df)
df["sumEnergy"] = df["sumEnergy"].astype(float)
df["sumContracts"] = df["sumContracts"].astype(float)
# df = pl.from_pandas(df)
df = df.drop(['Unnamed: 0', 'community','province', 'municipality', 'sumEnergy'],axis=1)
df = df.melt(
    id_vars=['dataDay', 'dataMonth', 'dataYear', 'postalCode', 'fare',
       'timeDiscrimination', 'measurePointType', 'sumContracts', 'tension',
       'economicSector', 'distributor'],
    value_vars=[f"mi{i}" for i in range(1, 25)],
    var_name="hour",
    value_name="Consumption"
)
df["hour"] = df["hour"].str.extract('(\d+)')
df["hour"] = df["hour"].str.zfill(2)

df['date'] = pd.to_datetime(df[['dataYear', 'dataMonth', 'dataDay']].agg('-'.join, axis=1) )
# df[hour]
mask = df['hour'] == '24'
df.loc[mask, 'hour'] = '00'
df.loc[mask, 'date'] = df.loc[mask, 'date'] + timedelta(days=1) 
df['time'] =  pd.to_datetime(df["date"].dt.strftime('%Y-%m-%d') + ' ' + df['hour']+':00')
df = df.drop(['dataDay', 'dataMonth', 'dataYear', 'fare',
       'timeDiscrimination', 'measurePointType', 'tension','distributor', 'hour', 
       'date'],axis = 1)
df_grouppeed = df.groupby(['postalCode', 'time','economicSector']).sum().reset_index()
sector_dfs = {sector: df_group for sector, df_group in df_grouppeed.groupby('economicSector')}
res = sector_dfs['RESIDENCIAL']

In [None]:
from pytz import timezone,utc
tz = timezone('Europe/Amsterdam')
# print(tz._utc_transition_times)

naive = datetime.strptime("2001-2-3 10:11:12", "%Y-%m-%d %H:%M:%S")
local_dt = tz.localize(naive, is_dst=None)
utc_dt = local_dt.astimezone(utc)
utc_dt

In [None]:
pl.from_pandas(df).is_empty()

## Upload datadis data

In [None]:
import psycopg2
import yaml


with open('/home/eouser/Desktop/DEDL/credentials.yaml', 'r') as f:
    c = yaml.safe_load(f)["postgres"]


conn = psycopg2.connect(f"dbname={c['db_name']} user={c['db_user']} password={c['db_password']} host={c['db_host']}")
conn.autocommit = True

# Create a cursor object
cursor = conn.cursor()

# Run VACUUM FULL
query = "DROP TABLE;"
cursor.execute(query)
conn.commit()
# Close the cursor and connection
cursor.close()
conn.close()

In [None]:
res

In [None]:

dtype_map = {
    'object': 'VARCHAR',
    'int64': 'INTEGER',
    'float64': 'FLOAT',
    'datetime64[ns]': 'TIMESTAMP',
    'datetime64[us]': 'TIMESTAMP'
}

def check_table_exists(table_name):
    query = f"""
        SELECT COUNT(*)
        FROM information_schema.tables
        WHERE table_schema = 'public' AND table_name = '{table_name}'
    """
    cursor.execute(query)
    return cursor.fetchone()[0] 
    

def upload_data(df, table_name):
    # start_date = df.select("time").to_series().unique().min()
    if check_table_exists(table_name):
        insert_query = f"""
        INSERT INTO {table_name} (postalCode, time, economicSector, sumContracts, Consumption)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (postalCode, time) DO NOTHING;
        """
        data_tuples = [tuple(x) for x in df.to_numpy()]
        cursor.executemany(insert_query, data_tuples)
        conn.commit()
            
    else:
        columns = []
        for col_name, dtype in df_res.dtypes.items():
            print(col_name)
            print(dtype)
            pg_type = dtype_map.get(str(dtype), 'VARCHAR')
            columns.append(f"{col_name} {pg_type}")
        create_table_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name} (
            {', '.join(columns)},
            PRIMARY KEY (postalCode, time)
        );
        """
        cursor.execute(create_table_query)
        conn.commit()
        # print(f"Monthly data for {start_date.strftime('%B, %Y')} uploaded successfully.")

df_res = pd.read_csv("residencial.csv",dtype=str)
upload_data(df_res,"residential_consumption")
# check_table_exists(cursor, "residential_consumption")

In [None]:
df_res

In [None]:
columns = []
for col_name, dtype in df_res.dtypes.items():
    print(col_name)
    print(dtype)
    pg_type = dtype_map.get(str(dtype), 'VARCHAR')
    columns.append(f"{col_name} {pg_type}")

In [None]:
from sqlalchemy import create_engine, text
import yaml


with open('/home/eouser/Desktop/DEDL/credentials.yaml', 'r') as f:
    c = yaml.safe_load(f)["postgres"]
    
engine = create_engine(f'postgresql://{c["db_user"]}:{c["db_password"]}@{c["db_host"]}:{c["db_port"]}/{c["db_name"]}')


def check_table_exists(engine, table_name):
    query = text(f"""
        SELECT COUNT(*)
        FROM information_schema.tables
        WHERE table_schema = 'public' AND table_name = '{table_name}'
    """)
    with engine.connect() as conn:
        result = conn.execute(query).fetchone()
        return result[0] > 0


def check_data_exists(engine,table_name,start_date, end_date):
    query = text(f"""
        SELECT COUNT(*)
        FROM "{table_name}"
        WHERE time BETWEEN '{start_date}' AND '{end_date}'
    """)
    with engine.connect() as conn:
        result = conn.execute(query).fetchone()
        return result
    

def upload_data(df, table_name):
    if isinstance(df, pd.DataFrame):
        df = pl.from_pandas(df)
        
    start_date = df.select("time").to_series().unique().min()
    end_date = df.select("time").to_series().unique().max()
    
    if check_table_exists(engine, table_name):
          
        start_date_str = start_date.strftime('%Y-%m-%d %H:%M:%S')
        end_date_str = end_date.strftime('%Y-%m-%d %H:%M:%S')
        
        if end_date.day * 24 check_data_exists(engine,table_name,start_date_str,end_date_str):
            df.to_pandas().to_sql(table_name,engine,chunksize=100000, if_exists="append",index=False, method="multi")
            print(f"Monthly data for {start_date.strftime('%B, %Y')} appended successfully.")
        else:
            print(f"Data for {start_date.strftime('%B, %Y')} already exists in the table.")
    
    else:
        df.to_pandas().to_sql(table_name,engine,chunksize=100000, if_exists="replace",index=False, method="multi")
        print(f"Monthly data for {start_date.strftime('%B, %Y')} uploaded successfully.")


upload_data(res,"ResidentialConsumption")

In [11]:
df_res = pd.read_csv("residencial.csv")
df_serv = pd.read_csv("services.csv")
# df_serv = pd.read_csv("services.csv")

In [None]:
df_res.dtypes

In [None]:
len(df["postalCode"].unique())