In [None]:
import os
import psycopg2
import logging
import pandas as pd
from utils.exceptions import ETLError
from utils.postgre import get_pg_connection_string, open_pg_connection, close_pg_connection
from utils.ai import get_clean_timed_prediction
logger = logging.getLogger()

In [None]:
import os
pgserver = os.getenv("PGSERVER")
pgdatabase = os.getenv("PGDATABASE")
pgusername = os.getenv("PGUSERNAME")
pgpassword = os.getenv("PGPWD")

In [57]:
pg_conn_string = get_pg_connection_string()

In [None]:
def insert_trash_df(trash_data:pd.Series,cursor:object,connection:object):
    """Insert trash dataframe within PostGre DB

    Arguments:
        trash_data {pd.Series} -- the pd.Series row of a trash dataframe data
        cursor {object} -- the postgre cursor object created from connection
        connection {object} -- the postgre connection object

    Returns:
        row_id -- the new id of the row created for the trash within Trash table
    """
    campaign_id = trash_data['campaign_id']
    point = trash_data['the_geom'].wkt
    elevation = trash_data['Elevation']
    trash_type_id = int(trash_data['trash_type_id']) #int() casting to address single pd.Series insert use case
    timestamp = trash_data['Time']
    precision = 99
    cursor.execute("INSERT INTO campaign.trash (id, id_ref_campaign_fk,the_geom, elevation, id_ref_trash_type_fk,time,precision ) VALUES (DEFAULT,%s,ST_SetSRID(%s::geometry,2154),%s,%s,%s,%s) RETURNING id;", (campaign_id,point,elevation,trash_type_id,timestamp,precision))
    connection.commit()
    row_id = cursor.fetchone()[0]
    return row_id


def insert_logs_etl_df(log_data:pd.Series,cursor:object,connection:object):
    """Insert logs of ETL operation within logs.etl table

    Args:
        log_data (pd.Series): the input log data
        cursor {object} -- the postgre cursor object created from connection
        connection {object} -- the postgre connection object

    Returns:
        row_id -- the new id of the row created for the trash within Trash table
    """
    campaign_id = log_data['campaign_id']
    media_id = log_data['media_id']
    media_name = log_data['campaign_id']
    status = log_data['status']
    cursor.execute("INSERT INTO logs.etl (id, campaign_id,media_id,media_name,status ) VALUES (DEFAULT,%s,%s,%s,%s) RETURNING id;", (id,campaign_id,media_id,media_name,status))
    connection.commit()
    row_id = cursor.fetchone()[0]
    return row_id

In [53]:
campaign_id = 'b843c94c-256c-4a4c-877e-2e928f448d67'
media_id = '639990fc-1173-498b-96d5-7b46ec7485d3'
media_name = 'gopro.mp4'
status = 'failed'

In [24]:
log_dico_1 = {'campaign_id':'0','media_id':'0','media_name':'0','status':'0'}
log_dico_2 = {'campaign_id':'0','media_id':'0','media_name':'0','status':'0'}
log_list = []
log_list.append(log_dico_1)
log_list.append(log_dico_2)
log_list

[{'campaign_id': '0', 'media_id': '0', 'media_name': '0', 'status': '0'},
 {'campaign_id': '0', 'media_id': '0', 'media_name': '0', 'status': '0'}]

In [34]:
columns = ['campaign_id','media_id','media_name','status']
log_list_1 = [[campaign_id,media_id,media_name,status]]
pd.DataFrame(log_list_1,columns=columns)

Unnamed: 0,campaign_id,media_id,media_name,status
0,b843c94c-256c-4a4c-877e-2e928f448d67,639990fc-1173-498b-96d5-7b46ec7485d3,gopro.mp4,failed


In [87]:
def get_log_df(campaign_id:str,media_id:str,media_name:str,status:str='failed'):
    cols = ['campaign_id','media_id','media_name','status']
    log_data = [[campaign_id,media_id,media_name,status]]
    log_df = pd.DataFrame(log_data,columns=cols)
    return log_df

In [88]:
log_df = get_log_df(campaign_id,media_id,media_name)

In [98]:
def insert_log_etl_df(log_data:pd.Series,cursor:object,connection:object):
    """Insert logs of ETL operation within logs.etl table

    Args:
        log_data (pd.Series): the input log data
        cursor {object} -- the postgre cursor object created from connection
        connection {object} -- the postgre connection object

    Returns:
        row_id -- the new id of the row created for the trash within Trash table
    """
    campaign_id = log_data['campaign_id']
    media_id = log_data['media_id']
    media_name = log_data['media_name']
    status = log_data['status']
    cursor.execute("INSERT INTO logs.etl (id, campaign_id,media_id,media_name,initiated_on,finished_on,status ) VALUES (gen_random_uuid (),%s,%s,%s,now(),now(),%s) RETURNING id;",(campaign_id,media_id,media_name,status))
    connection.commit()
    row_id = cursor.fetchone()[0]
    return row_id

In [99]:
log_df.iloc[0]

campaign_id    b843c94c-256c-4a4c-877e-2e928f448d67
media_id       639990fc-1173-498b-96d5-7b46ec7485d3
media_name                                gopro.mp4
status                                       failed
Name: 0, dtype: object

In [127]:
pg_conn_string = get_pg_connection_string()
pg_connection = open_pg_connection(pg_conn_string)
pg_cursor = pg_connection.cursor()

In [101]:
row_id = insert_log_etl_df(log_df.iloc[0],pg_cursor,pg_connection)

In [102]:
row_id

'b06e92fa-129f-44d2-9828-3745ec19faf4'

In [96]:
def update_log_etl(row_id:str,cursor:object,connection:object):
    cursor.execute("UPDATE logs.etl SET status = 'success' WHERE id = %s RETURNING id;",(row_id,))
    connection.commit()
    row_id = cursor.fetchone()[0]
    return row_id

In [97]:
row_id = update_log_etl(row_id,pg_cursor,pg_connection)
row_id

'3a0b0fa3-72a6-4457-8b29-847b49b1293a'

In [132]:
blob_url = 'https://etlplasticostorageacc.blob.core.windows.net/gopro/gopro.mp4'
pg_cursor.execute("SELECT id,id_ref_campaign_fk,filename FROM campaign.media WHERE blob_url = 'https://etlplasticostorageacc.blob.core.windows.net/gopro/gopro.mp4'")
result = pg_cursor.fetchone()
pg_connection.commit()

In [142]:
def select_media_info(blob_url:str,cursor:object,connection:object):
    cursor.execute("SELECT id,id_ref_campaign_fk,filename FROM campaign.media WHERE blob_url = %s",(blob_url,))
    connection.commit()
    query_result = list(cursor.fetchone()) # cast tuple to list
    media_info = {'media_id':query_result[0],'campaign_id':query_result[1],'media_name':query_result[2]}
    return media_info

In [143]:
select_media_info(blob_url,pg_cursor,pg_connection)

{'media_id': '639990fc-1173-498b-96d5-7b46ec7485d3',
 'campaign_id': 'b843c94c-256c-4a4c-877e-2e928f448d67',
 'media_name': 'gopro.mp4'}