In [39]:
import numpy as np
import pandas as pd
import os
from datetime import date
import sqlalchemy
import pymysql
import re

import warnings    # to avoid warning during executions
warnings.filterwarnings("ignore")

In [40]:
# database connection with mariaDB using SQL Alchemy
def dbConn_sqlAlc():
    database_username = 'root'
    database_password = 'password'
    database_ip       = '127.0.0.1:3306'
    database_name     = 'data_dashboard'
    database_connection = sqlalchemy.create_engine('mariadb+mariadbconnector://{0}:{1}@{2}/{3}'.
                                               format(database_username, database_password, 
                                                      database_ip, database_name))
    return database_connection

# # database connection with mariaDB using Maria DB package
def dBCon_Maria():
    connection = pymysql.connect(host='localhost',
                            user='root',
                            password='password',
                            db='data_dashboard')
    return connection


# connection = dBCon_Maria()
# cursor=connection.cursor()                                            
# database_connection = dbConn_sqlAlc()

In [49]:
# List all instances
def list_instances():
    database_connection = dbConn_sqlAlc()
    tbl_lookup = pd.read_sql("SELECT * FROM tbl_pbr_lookup", database_connection)
    return tbl_lookup

all_instances = list_instances()
all_instances.head(20)

Unnamed: 0,instance,unit,species,start_date,end_date,remark,raw_file_name
0,1,6,1,1629468000,1633939195,missing values from 2021-10-3 2:56:55 to 2021-...,_1_FPC13_ChCal_1.csv
1,2,6,1,1634652000,1644488995,,_2_FPC13_ChCal_2.csv
2,3,6,11,1625227200,1628157595,,_3_FPC13_ThalaPs.csv
3,4,1,2,1614763800,1618392595,,_4_FPC14_chamu.csv
4,5,1,7,1625486400,1639564200,,_5_FPC14_RhoSa.csv
5,6,1,11,1618561800,1625228995,,_6_FPC14_thalaps.csv
6,7,4,2,1634914800,1646645395,,_7_FPC21_chamu.csv
7,8,4,9,1623051000,1626771600,,_8_FPC21_Techu.csv
8,9,3,3,1620216000,1634632200,,_9_FPC22_DiaLut.csv
9,10,5,7,1616427000,1626690600,,_10_FPC23_Rhosa.csv


In [44]:
# function to set the additional parameters
def set_varables(trial_no):
    database_connection = dbConn_sqlAlc()
    tbl_lookup = pd.read_sql("SELECT * FROM tbl_pbr_lookup WHERE instance = ?", database_connection, params=[trial_no])
    species_id = int(tbl_lookup['species'])
    trial_no = int(tbl_lookup['instance'])
    unit_id = int(tbl_lookup['unit'])
    start_date = int(tbl_lookup['start_date'])
    end_date = int(tbl_lookup['end_date'])
    parameter = 12
    category = 6
    return species_id, trial_no, unit_id, start_date, end_date, parameter, category


# Function and for data extraction and preprocessing
# Preprocessing the data setting/ adding values and data types
def pbr_preProcess(trial_no):
    species_id, trial_no, unit_id, start_date, end_date, parameter, category = set_varables(trial_no)
    database_connection = dbConn_sqlAlc()
    tbl_log_calc = pd.read_sql("SELECT * FROM tbl_log_data WHERE unit = ? AND parameter = ? AND category = ? AND time_epoch BETWEEN ? AND ?", database_connection, params=[unit_id, parameter, category, start_date, end_date])
    sum_daily = tbl_log_calc.pivot_table(index=(['date_time', 'time_epoch', 'category', 'unit', 'parameter']),columns='message',values='old_value',fill_value=0)
    pbr_raw_tmp1 = sum_daily.reset_index()
    pbr_raw_tmp1= pbr_raw_tmp1.drop(columns=['category', 'parameter'])
    pbr_raw_tmp1.insert(loc=3, column='trial_no', value=trial_no)
    pbr_raw_tmp1.insert(loc=4, column='species', value=species_id)
    cols = list(pbr_raw_tmp1.columns)
    a, b = cols.index('date_time'), cols.index('time_epoch')
    cols[b], cols[a] = cols[a], cols[b]
    pbr_raw_tmp1 = pbr_raw_tmp1[cols]      
    return(
    pbr_raw_tmp1
    .rename(columns = {'time_epoch':'time_stamp', 'CD (g/l)':'D_CD_gpl', 'CO2 to algae conversion':'Co2_Conv',
                        'Daily CO2 consumption(g)':'D_CO2_g', 'Daily CO2 consumption(g/m)':'D_CO2_gpm',
                        'Daily CO2 conversion(g/g)':'D_Co2_Conv_gpg', 'Daily Feed(L)':'D_Feed_l', 'Daily Harvest(L)':'D_Harvest_l',
                        'Daily Harvested algae(g)':'D_Harvest_Algae_g', 'Daily PAR(mol/m)':'D_PAR_molpm', 'Daily efficiency (%)':'D_eff_percent',
                        'Daily productivity (g)':'D_Productivity_g', 'Daily productivity (g/m)':'D_Productivity_gpm',
                        'PAR to algae conversion':'PAR_2Algae_Conv', 'connected reactor surface (m)':'R_Surface_m',
                        'reactor biomassa (kg)':'R_Biomass_kg', 'reactor volume (l)':'R_Volume_l'})
    .assign(unit = unit_id)
    .replace('Empty', pd.np.nan)
    .replace('inf', pd.np.nan)
    .replace([np.inf, -np.inf], pd.np.nan)
    .astype({'time_stamp':'int64', 'date_time':'datetime64[ns]','unit': 'int16', 'trial_no': 'int16', 'species':'int16',
            'D_CD_gpl':'float32', 'Co2_Conv':'float32', 'D_CO2_g':'float32', 'D_CO2_gpm':'float32', 'D_Co2_Conv_gpg':'float32',
            'D_Feed_l':'float32', 'D_Harvest_l':'float32', 'D_Harvest_Algae_g':'float32', 'D_PAR_molpm':'float32', 'D_eff_percent':'float32',
            'D_Productivity_g':'float32', 'D_Productivity_gpm':'float32', 'PAR_2Algae_Conv':'float32', 'R_Surface_m':'float32',
            'R_Biomass_kg':'float32', 'R_Volume_l':'float32'})
    )

# Missing Value imputation using interpolation - spline
def missingValue_imputation(sum_hour_cleaned):
    sum_hour_cleaned_indexed = sum_hour_cleaned.set_index('date_time')
    null_cols = sum_hour_cleaned_indexed.columns[sum_hour_cleaned_indexed.isnull().any()].tolist()
    if null_cols:
        for pbr_col in null_cols:
            sum_hour_cleaned_indexed[pbr_col]=sum_hour_cleaned_indexed[pbr_col].interpolate(option='spline')

        sum_hour_cleaned_indexed = sum_hour_cleaned_indexed.reset_index(level=0)
        new_col = list(sum_hour_cleaned.columns)
        sum_hour_cleaned_indexed=sum_hour_cleaned_indexed[new_col]
        return sum_hour_cleaned_indexed
    else:
        print('No columns with missing values')
        return sum_hour_cleaned


# exporting the preprocessed data into db pbr
def pbr_exporttoDB(sumHour_preprocessed):
    connection = dBCon_Maria()
    cursor=connection.cursor()      
    cols = "`,`".join([str(i) for i in sumHour_preprocessed.columns.tolist()])
    for i,row in sumHour_preprocessed.iterrows():
        sql = "INSERT INTO `tbl_cumulative_per_day` (`" +cols + "`) VALUES (" + "%s,"*(len(row)-1) + "%s)"
        cursor.execute(sql, tuple(row))

    connection.commit()
    connection.close()


#pbr_exporttoDB(cum_Dayly_cleaned)
#cum_Dayly_cleaned = missingValue_imputation(tmp_CumDay_df)
#tmp_CumDay_df = pbr_preProcess(trial_no)

#### ================== SET VARIABLES ==================================

In [82]:
#List exported files
with open('./exported_list.txt', 'r') as f:
    print(f.read())


_1_FPC13_ChCal_1.csv


In [83]:
# SET TRIAL NUMBER FOR SELECT THE DATA FROM LOG
trial_no = 2
# get file Name
file_name = all_instances.query('instance == @trial_no')['raw_file_name'][trial_no-1]  #file_name = file_name[0]
#file_name = file_name[trial_no-1]

In [84]:
# load content of the export_list into list split by new line
# data = [line.strip() for line in open("exported_list.txt", 'r')]
with open('./exported_list.txt') as f:
    lines = f.read().splitlines()

if (file_name not in lines):
    tmp_CumDay_df = pbr_preProcess(trial_no)
    cum_Dayly_cleaned = missingValue_imputation(tmp_CumDay_df)
    try:
        pbr_exporttoDB(cum_Dayly_cleaned)
        file1 = open("exported_list.txt", "a")
        file1.write("\n")     
        file1.write(file_name)
        file1.close()
        print('file is exported')

        _dir_path_day = '../../../dataExport/cumulativeDATA/Day'
        save_path = os.path.join(_dir_path_day, file_name)
        cum_Dayly_cleaned.to_csv(save_path, index = False)
    except Exception as e: print(e)

else:
    print('data is present in the db - check and very the trial number in tbl_pbr_data')

file is exported
