In [1]:
from ETL.Pipeline import Pipeline, FlowConfig
from ETL.modules.DataSource import DataSourceConfig
from ETL.modules.Encoder import EncoderConfig
from ETL.modules.Loader import LoaderConfig
from ETL.modules.Utils import Utils
from datetime import datetime
import mysql.connector
import pandas as pd
import os
# Datasource Config

ds_config = DataSourceConfig().custom_source_config()

# Ingestion Function

def query_wp_post(feedback):
    df = pd.DataFrame()
    latest_date = ""
    if "latest_date" in feedback:
        latest_date = feedback['latest_date']
    try:
        host = "127.0.0.1"
        user = "root"
        password = ""
        database = "mt-engineering"
        connection = mysql.connector.connect(host=host, user=user, password=password, database=database)
        if connection.is_connected():
            query = f"Select post_title, post_status, post_date from wp_posts" # where post_date > '{latest_date}'"
            cursor = connection.cursor()
            cursor.execute(query)
            column_names = [description[0] for description in cursor.description]
            results = cursor.fetchall()
            if len(results) > 0:
                df = pd.DataFrame(results, columns=column_names)
                df["date_time_execution"] = datetime.now()
                feedback['latest_date'] = max(df['post_date'])
            cursor.close()
            connection.close()
        else:
            print(f"Not Connected to host: {host} - database: {database}")
    except mysql.connector.Error as err:
        print("Error: ", err)
    print(feedback)
    return df, feedback

# Encoder Config

cols_source = ['post_title', 'post_status']
cols_dest = ['meta_id', 'POST_STATUS_UPPER']
en_config = EncoderConfig().custom_encoder_config(cols_source, cols_dest)

# Transformation Function

def upper_meta_value(df):
    df['post_status'] = df['post_status'].str.upper()
    return df

# Loader Config / Function

ld_config = LoaderConfig().custom_loader()

def append_to_file_csv(df):
    print(df)
    cols_dest = ['meta_id', 'POST_STATUS_UPPER']
    path = "./export/excel_export_test.csv"
    print(path)
    if os.path.exists(path):
        result = pd.concat([pd.read_csv(path), df])
        result.to_csv(path, index=True)
    else:
        df.to_csv(path, index=True)

# Control Config - Create Started date/time

date_string = "2023-11-01 00:00:00"
date_format = "%Y-%m-%d %H:%M:%S"
start_datetime = datetime.strptime(date_string, date_format)

# Pipeline Object Creation

pipeline = Pipeline("ETL Pipeline")

# Create Pipeline Elements

## Flow 1

pipeline.create_datasource('MySQL', ds_config)
pipeline.create_encoder('wp_postmeta_column', en_config)
pipeline.create_ingestion('Ingestion from 127.0.0.1/mt-engineering', query_wp_post)
pipeline.create_control('30 Seconds Update',3,3600,start_datetime)
pipeline.create_process('Upper Meta Value', [Utils().load_obj('./upper_meta_value.pkl')])
pipeline.create_loader('Custom Function', ld_config, append_to_file_csv)

# Flow Config

flow_config = FlowConfig()

flow1 = flow_config.create_flow(
    ID_datasource='MySQL',
    ID_encoder='wp_postmeta_column',
    ID_ingestion='Ingestion from 127.0.0.1/mt-engineering',
    ID_control='30 Seconds Update',
    ID_process='Upper Meta Value',
    ID_loader='Custom Function')

flow_config.add_flow(flow1)

# Set Pipeline flow

pipeline.set_flow(flow_config.get_flow_config())

# Pipeline Start 

pipeline.start()

Linking Pipeline flows for ETL Pipeline
Creating Thread...
Wait for first execution at 2023-11-03 21:09:15 for Ingestion from 127.0.0.1/mt-engineering
Running Ingestion Ingestion from 127.0.0.1/mt-engineering...
{'latest_date': Timestamp('2023-10-16 13:41:59')}
Ingestion Ingestion from 127.0.0.1/mt-engineering successfully completed!
Running Transformation Upper Meta Value for Ingestion Ingestion from 127.0.0.1/mt-engineering...
Trasformation Upper Meta Value for ingestion Ingestion from 127.0.0.1/mt-engineering Done!
Loading data from Ingestion Ingestion from 127.0.0.1/mt-engineering...
            meta_id POST_STATUS_UPPER           post_date  \
0      Hello world!           PUBLISH 2023-10-07 13:37:25   
1       Sample Page           PUBLISH 2023-10-07 13:37:25   
2    Privacy Policy             DRAFT 2023-10-07 13:37:25   
3        Navigation           PUBLISH 2023-10-07 13:37:26   
4       Default Kit           PUBLISH 2023-09-14 04:08:08   
..              ...               ...  

In [1]:
import pandas as pd
from modules.Utils import Utils

df = pd.DataFrame(Utils().read_json('export/mysql_fetch_example.json'))

df

Unnamed: 0,meta_id,post_id,meta_key,META_VALUE_UPPER
0,1,2,_wp_page_template,DEFAULT
1,2,3,_wp_page_template,DEFAULT
2,30,18,_wp_attached_file,WOOCOMMERCE-PLACEHOLDER.PNG
3,31,18,_wp_attachment_metadata,"A:6:{S:5:""WIDTH"";I:1200;S:6:""HEIGHT"";I:1200;S:..."
4,38,26,_elementor_edit_mode,BUILDER
...,...,...,...,...
2268,2648,6125,_elementor_css,"A:6:{S:4:""TIME"";I:1697463737;S:5:""FONTS"";A:1:{..."
2269,2649,1583,_elementor_css,"A:6:{S:4:""TIME"";I:1697463792;S:5:""FONTS"";A:1:{..."
2270,2650,1476,_elementor_css,"A:6:{S:4:""TIME"";I:1697463838;S:5:""FONTS"";A:0:{..."
2271,2651,1825,_elementor_css,"A:6:{S:4:""TIME"";I:1697463870;S:5:""FONTS"";A:1:{..."
