---
---

<center><h1> Project: ETL Pipeline </h1></center>

----

* **ETL Pipeline 1:** Update User Summary Table
* **ETL Pipeline 2:** Update Transaction Summary Table
* **ETL Pipeline 3:** Update Valid Refund Table

----

We have already setup the simulation environment for our ETL Project. So, new data is already coming to our MySQL tables.

<br>

In this notebook, we will create the required pipelines one by one and finally use the schedular library to schedule all three pipelines.

----

---

#### `Importing the required libraries`

---

In [1]:
# importing the required libraries
import schedule
import time
import pandas as pd
import mysql.connector as mysql
from datetime import timedelta
import datetime

---

***Define a function to create the connection with the MySQL database. `Configure the following cell as per your system settings.`***

---

In [2]:
def create_connection():
    db = mysql.connect(
    host = "localhost",
    user = "lakshay", ## Enter your username here
    password = "ABC@123", ## Enter your password here
    database = "website", ## Enter your database name here
    auth_plugin = "mysql_native_password",
) 
   
    return db

---
---

<center><h1> ETL Pipeline 1 </h1></center>

---


![](images/pipeline-1.png)

---
---



#### `Define the Extraction Function`

---

* ***`extract_users_data()`***: It will extract the data from the `users` table within defined time range and return the dataframe.

<br>

* ***`Paramaters Required`***:
    * `Database Connection String` is required so that it can connect to the database and query the data. 
    * `Start time` and `End time` is required so that we can modify the query.

<br>

* ***`Output`***: It will return the pandas dataframe of the extracted data.

---

In [3]:
def extract_users_data(db, start_time, end_time):
    
    # create database cursor
    cursor = db.cursor()
    
    print("Extracting signup results between {} and {}".format(str(start_time),str(end_time)))
    
    # command to extract the data of last 5 minutes.
    command = "SELECT * FROM users WHERE signup_time BETWEEN '{}' AND '{}'".format(start_time, end_time)
    
    # execute the command and return the results.
    cursor.execute(command)
    data = cursor.fetchall()
    
    # return the dataframe
    return pd.DataFrame.from_records(data, columns= ['user_id',
                                                     'user_email',
                                                     'user_name',
                                                     'source',
                                                     'is_prime',
                                                     'signup_time'])

---

#### `Define the Trasformation Function`

---

* ***`transform_users_data()`***: It will use the data from the `extract_users_data()` of the last 5 minutes and do the following transformation using pandas.
    * Replace the category `Not Available` with the `Organic` in the source column.
    * Use the groupby function to calculate number of users in each category of source.
    * Use the groupby function to calculate number of prime users in each category of source.
    * Store all the results in a dictionary 
    * Add the start & end time in the dictionary and return it.
    
<br>    

* ***`Paramaters Required`***:
    * `df_user` user data dataframe of last 5 minutes. 
    * `Start time` and `End time` is required to update the output, so that we know the signup summary is between this particular time range.

<br>

* ***`Output`***: It will return the dictionary that will be used to import the data into the users summary table.

---

----

In [4]:
# define the tranform function
def transform_user_data(df_user, start_time, end_time):
    
    print("Transforming User Data...")
    
    # replace the "Not Available" with the "Organic"
    df_user.source.replace("Not Available", "Organic", inplace=True)
    
    # groupby to calculate the number of users in each category of source.
    source_count = df_user.groupby(['source'])['user_id'].count()
    
    # groupby to calculate the number of prime users in each category of source.
    prime_count  = df_user.groupby(['source'])['is_prime'].sum()
    
    # create dictionary of source count
    source_count_dict = source_count.to_dict()
    
    # append prime count in the same dictionary
    for key in prime_count.to_dict():
        new_key_name = "prime_from_" + key
        source_count_dict[new_key_name] = prime_count[key]
    
    # add start_time and end_time to the dictionary 
    source_count_dict['start_time'] = str(start_time)
    source_count_dict['end_time'] = str(end_time)
    
    # return the final dictionary
    return source_count_dict

---

#### `Define the Loading Function`

---

* ***`load_users_summary()`***: It will use the results dictionary from the `transform_users_data` and load it into the `signup_summary_table`.

    
<br>    

* ***`Paramaters Required`***:

    * `result_dict` final results dictionary from the transform function. 
    * `db` database connection string to update the values in the table.

<br>

* ***`Output`***: It will not return anything.

---

In [5]:
def load_user_summary(result_dict, db):
    print("Loading User Summary Table...")
    cursor = db.cursor()
    
    # command to insert the data into the signup summary table using result dict
    command = "INSERT INTO signup_summary({col}) values{val}".format(col= ','.join(result_dict.keys()),
                                                                     val= tuple(result_dict.values()))
    cursor.execute(command)
    db.commit()

---
---

#### `Define the ETL Pipeline`

<br>

Now, we will define the pipeline function, we will take the database connection as the parameter and do the following steps.

 * Now, we will define the time for which we want to extract the data.
 * Extract the latest 5 minutes of users data.
 * Transform it to get the users signup summary.
 * Load the data into the signup_summary table.

---

In [6]:

def pipeline_to_update_user_summary(db_object):
    
    # get the current time and time before 5 minutes
    current_time = datetime.datetime.now()
    current_minus_5 = current_time - datetime.timedelta(minutes=5)
    
    print("========================================================================================")
    print("Starting ETL to update user summary!!")
    
    ## EXTRACTION
    latest_user_data = extract_users_data(db = db_object,
                                          start_time = current_minus_5,
                                          end_time=current_time)
    
    ## TRANSFORMATION
    user_summary_data = transform_user_data(df_user= latest_user_data,
                                            start_time=current_minus_5, 
                                            end_time=current_time)
    
    ## LOADING
    load_user_summary(result_dict = user_summary_data,
                      db = db_object)
    
    print("Successfully loaded the data into user summary table !!")
    print("========================================================================================")

In [7]:
# let's execute the pipeline 1


---

<center><h1> ETL Pipeline 2 </h1></center>

---


![](images/pipeline-2.png)




---

#### `Define the extraction functions`

---

* ***`extract_products_data()`***: It will read the products data from the CSV file. 

<br>

* ***`Paramaters Required`***: CSV file path is required. It is present in the `dataset` folder.

<br>

* ***`Output`***: It will return the pandas dataframe of the CSV file.



In [8]:
# define function to extract the product_data from the CSV file
def extract_products_data(file_path):
    return pd.read_csv(file_path)

---

* ***`extract_transaction_data()`***: It will extract the data from the `transaction` table within a defined time range and return the dataframe.

<br>

* ***`Paramaters Required`***:
    * `Database Connection String` is required so that it can connect to the database and query the data. 
    * `Start time` and `End time` is required so that we can modify the query.

<br>

* ***`Output`***: It will return the pandas dataframe of the extracted data.

---

In [9]:
def extract_transaction_data(db, start_time, end_time):
    
    # create database cursor
    cursor = db.cursor()
    
    print("Extracting transactions between {} and {}".format(str(start_time),str(end_time)))
    
    # command to extract the data of last 5 minutes.
    command = "SELECT * FROM transaction WHERE transaction_time BETWEEN '{}' AND '{}'".format(start_time, end_time)
    
    # execute the command and return the results.
    cursor.execute(command)
    data = cursor.fetchall()
    
    # return the dataframe
    return pd.DataFrame.from_records(data, columns= ['transaction_id',
                                                     'user_id',
                                                     'product_id',
                                                     'transaction_time',
                                                     'price'])

---

#### `Define the Trasformation Function`

---

* ***`transform_transaction_data()`***: It will use the data from the `extract_transform_data()` of the last 10 minutes and the output of `extract_products_data()` and do the following transformation using pandas.
    * Do the left join on `transction_data` and the `products_data`.
    * Split the product_name and create a new feature `brand`.
    * Use groupby to calculate the brand-wise sales.
    * Use groupby to calculate the category-wise sales. 
    * Create dictionary of the calculated results.
    * Add start and end time to the dictionary.
    
<br>    

* ***`Paramaters Required`***:
    * `df_transaction` transaction data dataframe of last 10 minutes. 
    * `df_product` product data frame.
    * `Start time` and `End time` is required to update the output, so that we know the signup summary is between this particulae time range.

<br>

* ***`Output`***: It will return the dictionary which will be used to import the data into transaction summary.

---

----

In [10]:
# define the tranform function
def transform_transaction_data(df_transaction, df_product, start_time, end_time):
    
    print("Transforming Transaction Data...")
    
    # merge the transaction and product dataframe.
    merged_df = df_transaction.merge(df_product, how='left', on='product_id')
    
    # split the product_name to get the brand  
    merged_df['brand'] = merged_df['product_name'].apply(lambda x: x.split()[0])
    
    # calculate the brand count
    brand_count = merged_df.groupby(['brand'])['transaction_id'].count()
    
    # calculate the category count
    category_count = merged_df.groupby(['product_category'])['transaction_id'].count()
    
    # calculate the brand wise sales
    brand_wise_sales    = merged_df.groupby(['brand'])['price'].sum()
    
    # calculate the category wise sales
    category_wise_sales = merged_df.groupby(['product_category'])['price'].sum()
    
    # create dictionary 
    brand_count_dict = brand_count.to_dict()
    
    # append brand_count to dictionary
    for key in category_count.to_dict():
        brand_count_dict[key] = category_count[key]
    
    # append brand wise sales to dictionary
    for key in brand_wise_sales.to_dict():
        new_key_name = "sales_from_" + key
        brand_count_dict[new_key_name] = brand_wise_sales[key]
    
    # append category wise sales to dictionary
    for key in category_wise_sales.to_dict():
        new_key_name = "sales_from_" + key
        brand_count_dict[new_key_name] = category_wise_sales[key]
    
    # append start and end time to the dictionary
    brand_count_dict['start_time'] = str(start_time)
    brand_count_dict['end_time'] = str(end_time)
    
    return brand_count_dict

---

#### `Define the Loading Function`

---

* ***`load_transaction_summary()`***: It will use the results dictionary from the `transform_transaction_data` and load it into the `transaction_summary_table`.

    
<br>    

* ***`Paramaters Required`***:
    * `result_dict` final results dictionary from the transform function. 
    * `db` database connection string to update the values in the table.

<br>

* ***`Output`***: It will not return anything.

---

In [11]:
def load_transaction_summary(result_dict, db):
    print("Loading Transaction Summary Table...")
    cursor = db.cursor()
    
    # command to insert the data into the transaction summary using the 
    command = "INSERT INTO transaction_summary({col}) values{val}".format(col= ', '.join(result_dict.keys()),
                                                                       val= tuple(result_dict.values()))
    
    # we need to make some changes to the command, because we have space in two columns
    # "Air Conditioner" and "sales_from_Air conditioner"
    # Here we will have to replace Air Conditioner with `Air Conditioner`
    command = command.replace(' Air Conditioner', '`Air Conditioner`')
    command = command.replace('sales_from_Air Conditioner', '`sales_from_Air Conditioner`')
    
    # execute the commad
    cursor.execute(command)
    db.commit()

---
---

#### `Define the ETL Pipeline`

<br>

Now, we will define the pipeline function, we will take the database connection as the parameter and do the following steps.

 * we will define the time for which we want to extract the data.
 * Extract the latest 10 minutes of transactions data.
 * Transform it to get the transactions summary.
 * Load the data into the transaction_summary table.

---

In [12]:
def pipeline_to_update_transaction_summary(db_object, products_data):
    # get the current time and time before 10 minutes
    current_time = datetime.datetime.now()
    current_minus_10 = current_time - datetime.timedelta(minutes=10)
    
    print("========================================================================================")
    print("Starting ETL to update transaction summary!!")
    
    ## EXTRACTION
    latest_transaction_data = extract_transaction_data(db = db_object,
                                                       start_time = current_minus_10,
                                                       end_time = current_time)
    
    ## TRANSFORMATION
    transaction_summary_data = transform_transaction_data(df_product = products_data,
                                                          df_transaction = latest_transaction_data,
                                                          start_time = current_minus_10,
                                                          end_time = current_time)
    ## LOADING
    load_transaction_summary(result_dict = transaction_summary_data,
                             db = db_object)
    
    print("Successfully loaded the data into transaction summary table !!")
    print("========================================================================================")

In [13]:
# let's execute the pipeline 2

---
---


<center><h1> ETL Pipeline 3 </h1></center>

---


![](images/pipeline-3.png)



---
---

---

* ***`extract_refund_data()`***: It will extract the data from the `refund_detail` table within the defined time range and return the dataframe.

* ***`Paramaters Required`***:
    * `Database Connection String` is required so that it can connect to the database and query the data. 
    * `Start time` and `End time` is required so that we can modify the query.

* ***`Output`***: It will return the pandas dataframe of the CSV file.

---

In [14]:
def extract_refund_data(db, start_time, end_time):
    
    # create database cursor
    cursor = db.cursor()
    
    print("Extracting refund between {} and {}".format(str(start_time),str(end_time)))
    
    # command to extract the data of last 30 minutes.
    command = "SELECT * FROM refund_detail WHERE ticket_raise_time BETWEEN '{}' AND '{}'".format(start_time, end_time)
    
    # execute the command and return the results.
    cursor.execute(command)
    data = cursor.fetchall()
    
    # return the dataframe
    return pd.DataFrame.from_records(data, columns= ['ticket_id',
                                                     'user_name',
                                                     'transaction_id',
                                                     'transaction_amount',
                                                     'ticket_raise_time'])

---

* ***`extract_valid_refund_data()`***: It will extract only the `transaction_id` from the `valid_refund` table within defined time range and return the dataframe.

* ***`Paramaters Required`***:
    * `Database Connection String` is required so that it can connect to the database and query the data. 
    * `Start time` and `End time` is required so that we can modify the query.

* ***`Output`***: It will return the list.

---

In [15]:
def extract_valid_refund_data(db, start_time, end_time):
    
    # create database cursor
    cursor = db.cursor()
    
    print("Extracting valid refund data between {} and {}".format(str(start_time),str(end_time)))
    
    # command to extract the data of last 30 minutes.
    command = "SELECT transaction_id FROM valid_refund WHERE ticket_raise_time BETWEEN '{}' AND '{}'".format(start_time, end_time)
    
    # execute the command and return the results.
    cursor.execute(command)
    data = cursor.fetchall()
    
    # return the valid transaction IDs list
    return pd.DataFrame.from_records(data, columns= ['ticket_raise_time'])['ticket_raise_time'].to_list()


---

#### `Define the Trasformation Function`

---

* ***`transform_users_data()`***: It will use the data from the `refund_details` table , `transactions` table & `valid_refunds` table of and do the following transformation using pandas.
    * Merge the transaction data and the refunds data.
    * If the transaction id matches, it is `valid refund request`.
    * If any duplicate transaction id occurs in the list, remove it. There is a possibility that a person asks for refund multiple times for a single transaction.
    * If the transaction id doesn't match, then it is a `invalid refund request`.
    * Save the `valid refund request` and `invalid refund request` in a separate dataframe or list and return it. 
    
<br>    

* ***`Paramaters Required`***:
    * `df_refund`: refund requests data dataframe of last 30 minutes. 
    * `df_transaction_id`: transaction data of last 48 hours
    * `refund_issued_TID`: list of valid transaction ids in the last 48 hours.

<br>

* ***`Output`***: It will return two details `valid refund request` and `invalid refund request`.

---

----

In [16]:
def transform_refund_data(df_refund, df_transactions, refund_issued_TID):
    
    print("Transforming Refund Data...")
    # merge the transactions data and the refund request
    # refund request is for the last 30 minutes
    # transactions data is for the last 48 hours
    valid_transactions = df_transactions.merge(df_refund, how='left', on='transaction_id')
    valid_transactions = valid_transactions[valid_transactions.ticket_id.isnull() == False]
    
    # keep only one row for each transaction id
    valid_transactions = valid_transactions.groupby(['transaction_id']).first().reset_index()
    
    # if the transaction id is available in refund issued, then it is a duplicate request
    # otherwise it is a final valid request
    valid_transactions_final = valid_transactions[~valid_transactions.transaction_id.isin(refund_issued_TID)]
    valid_transaction_duplicate = valid_transactions[valid_transactions.transaction_id.isin(refund_issued_TID)]
    
    
    # select the columns for the final output.
    valid_transactions_final = valid_transactions_final[['ticket_id', 'transaction_id', 'user_id', 'price', 'ticket_raise_time']]
    
    # convert the dataframe into a list of tuples as we have to do the multiple inserts in the table
    valid_transactions_final = [tuple((row[0],row[1],row[2],int(row[3]), str(row[4]))) for row in valid_transactions_final.to_records(index=False)]
    
    # list of rejected refunds.
    # create a new column refund reject reason
    # if the transaction id doesn't match, mark it as `transaction_id_not_matched`.
    # if the request is duplicate, mark it as `already_processed`
    refund_rejected = df_refund.merge(df_transactions, how='left', on='transaction_id')
    refund_rejected['refund_reject_reason'] = None
    refund_rejected.loc[refund_rejected.product_id.isna() == True, 'refund_reject_reason'] = 'transaction_id_not_matched'
    refund_rejected.loc[refund_rejected.transaction_id.isin(refund_issued_TID), 'refund_reject_reason'] = 'already_processed'
    
    # select the final columns and return the result.
    refund_rejected = refund_rejected[refund_rejected.refund_reject_reason.isna() == False]
    refund_rejected = refund_rejected[['ticket_id', 'user_name', 'refund_reject_reason']]
    
    
    return valid_transactions_final, refund_rejected
    

---

#### `Define the Loading Function`

---

* ***`load_refund_summary()`***: It will use the results dataframe and list from the `transform_refund_data` and load it into the `valid_refund_table` and a CSV file.

    
<br>    

* ***`Paramaters Required`***:
    * `valid_refunds` final list of records to be added into valid refunds table. 
    * `rejected_requests` dataframe that needs to be converted into the rejected request list.
    * `db` database connections

<br>

* ***`Output`***: It will not return anything.

---

In [17]:
def load_refund_data(valid_refunds, rejected_requests, db):
    print("Loading Valid Refunds Table...")
    cursor = db.cursor()
    # command to insert into into the valid_refund table
    command = "INSERT INTO valid_refund(ticket_id, transaction_id, user_id, price, ticket_raise_time) values(%s, %s, %s, %s, %s)"
    cursor.executemany(command, valid_refunds)
    db.commit()
    # rejected request to be converted into the CSV file and uploaded into the output_folder
    file_name = "rejected_request" + str(pd.datetime.now()) + ".csv"
    rejected_requests.to_csv("output_folder/" + file_name)

---
---

#### `Define the ETL Pipeline`

<br>

Now, we will define the pipeline function, we will take the database connection as the parameter and do the following steps.

 * Now, we will define the time for which we want to extract the data.
 * Extract the latest 30 minutes of users data.
 * Extract the latest 48 hours of transactions data.
 * Extract the lastest 48 hours of refund requests data.
 * Transform it to get the `valid refunds request` and `invalid refund requests`.
 * Load the data into the valid_refunds table and a CSV file.

---

In [18]:
def pipeline_to_update_valid_refunds(db_object):
    
    current_time = datetime.datetime.now()
    current_minus_30 = current_time - datetime.timedelta(minutes = 30)
    current_minus_48 = current_time - datetime.timedelta(hours  = 48)
    
    print("========================================================================================")
    print("Starting ETL to update valid refunds data!!")
    
    ## EXTRACTION
    latest_refund_requests = extract_refund_data(db = db_object,
                                                 start_time = current_minus_30,
                                                 end_time = current_time)
    
    valid_transactions = extract_transaction_data(db = db_object,
                                                  start_time = current_minus_48,
                                                  end_time = current_time)
   
    refund_issued = extract_valid_refund_data(db = db_object,
                                              start_time = current_minus_48,
                                              end_time = current_time)
    
    ## TRANSFORMATION
    valid_refunds, refunds_rejected = transform_refund_data(df_refund=latest_refund_requests,
                                                            df_transactions=valid_transactions,
                                                            refund_issued_TID= refund_issued)
    
    ## LOADING
    load_refund_data(db = db_object,
                     valid_refunds = valid_refunds,
                     rejected_requests = refunds_rejected)
    
    print("Successfully loaded the data into valid refund table !!")
    print("========================================================================================")

In [19]:
# let's create the pipeline 3

---

<center><h1> Schedule Pipelines </h1></center>

---

![](images/pipeline-4.png)

---

In [20]:
#### Schedule Pipelines

db_object = create_connection()
products_data = pd.read_csv('dataset/product_table.csv')

# pipeline 1: every 300 seconds
schedule.every(30).seconds.do(pipeline_to_update_user_summary, db_object = db_object)
# pipeline 2: every 600 seconds
schedule.every(60).seconds.do(pipeline_to_update_transaction_summary, db_object = db_object, products_data = products_data)
# pipeline 3: every 1800 seconds
schedule.every(80).seconds.do(pipeline_to_update_valid_refunds, db_object = db_object)

Every 80 seconds do pipeline_to_update_valid_refunds(db_object=<mysql.connector.connection_cext.CMySQLConnection object at 0x7f82a02c7290>) (last run: [never], next run: 2021-09-28 15:07:35)

In [None]:
# start the schedular
while True:
        schedule.run_pending() 
        time.sleep(1)

Starting ETL to update user summary!!
Extracting signup results between 2021-09-28 15:01:46.553131 and 2021-09-28 15:06:46.553131
Transforming User Data...
Loading User Summary Table...
Successfully loaded the data into user summary table !!
Starting ETL to update transaction summary!!
Extracting transactions between 2021-09-28 14:57:15.737396 and 2021-09-28 15:07:15.737396
Transforming Transaction Data...
Loading Transaction Summary Table...
Successfully loaded the data into transaction summary table !!
Starting ETL to update user summary!!
Extracting signup results between 2021-09-28 15:02:16.894747 and 2021-09-28 15:07:16.894747
Transforming User Data...
Loading User Summary Table...
Successfully loaded the data into user summary table !!
Starting ETL to update valid refunds data!!
Extracting refund between 2021-09-28 14:37:36.534991 and 2021-09-28 15:07:36.534991
Extracting transactions between 2021-09-26 15:07:36.534991 and 2021-09-28 15:07:36.534991
Extracting valid refund data b