# eFarmersHub Data Analysis
eFarmersHub data are stored in `gds_database` for data visualization. There are 8 tables:
1. Income generating tables: Sale, Machine Rent & Advisory
2. Expenditure tables: Purchase, Processing, Expense
3. User table: This table stores all the user data

The script utilizes `SQLAlchemy` as a database toolkit for CRUD operation while `Pandas` is used for data manipulation.

In [1]:
# Import Modules
# data manipulation and analysis
import pandas as pd
import numpy as np

# database toolkit
from sqlalchemy import create_engine, MetaData, inspect, Table, Column, Integer, String, Date, Numeric, extract
from sqlalchemy.engine.url import URL
from sqlalchemy.sql import select

# read env file
from dotenv import load_dotenv
import os

# path handling
from pathlib import Path

# logging
import logging

# z-score for anomaly detection
from scipy.stats import zscore

In [2]:
# load env variables
dotenv_path = Path("./.env")
load_dotenv(dotenv_path=dotenv_path)

USERNAME = os.getenv("USERNAME")
PASSWORD = os.getenv("PASSWORD")
HOST = os.getenv("HOST")
PORT = os.getenv("PORT")
DATABASE = os.getenv("DATABASE")

### 1. Read Sale Table
Sale data are stored in `gds_sale_transactions` table. For financial analysis such as revenue and profit, `net_amount` and `cogs_amount` are considered.

In [3]:
def extract_sale(engine):
    """
    read gds_sale_transactions table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, market_type,
                    business_category, category, product, transaction_date, transaction_id, customer_id, customer_name,
                    customer_mobile, customer_gender, product_amount, net_amount, due_amount, cogs_amount, version,
                    currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, customer_id, customer_name, customer_mobile, customer_gender,
                        market_type, sale_type, business_category, category_id, category, product_id, product,
                        unit_type, attribute, quantity, unit_price, product_amount, sub_total_amount,
                        commission_amount, discount_amount, net_amount, paid_amount, due_amount, due_receivable_date,
                        version, user_join_date, user_region, customer_join_date, currency_exchange_rate, cogs_amount
                    FROM gds_database.gds_sale_transactions
                ) sale;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df

In [4]:
def transform_sale(df):
    """
    transform sale dataframe and returns df
    :param df: actual sale dataframe
    :return df: transformed dataframe
    """
    
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)
    
    # convert date_of_transaction to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["customer_id"] = df["customer_id"].astype(str)
    df["customer_mobile"] = df["customer_mobile"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["product_amount"] = df["product_amount"].astype(float)
    df["net_amount"] = df["net_amount"].astype(float)
    df["cogs_amount"] = df["cogs_amount"].astype(float)
    df["due_amount"] = df["due_amount"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)

    # renaming columns for consistency
    df.rename(columns={"net_amount" : "revenue"}, inplace=True)

    # add transaction_category column to identify the module used for transaction
    df["transaction_category"] = "Sale"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category", "product",
            "version"]) \
        .drop_duplicates(subset=["transaction_id", "category", "product"], keep="last")

    return df

### 2. Machine Rent
Machine Rent data are stored in `gds_machine_rent_transactions` table. For financial analysis such as revenue and profit, `net_amount` is considered.

**Note:** Depreciation is not being considered at the moment.

In [5]:
def extract_machine_rent(engine):
    """
    read gds_machine_rent_transactions table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, business_category,
                    category, product, transaction_date, transaction_id, customer_id, customer_name,
                    customer_mobile, customer_gender, amount, net_amount, due_amount, version, currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, customer_id, customer_name, customer_mobile, customer_gender,
                        business_category, category_id, category, product_id, product, unit_type, quantity,
                        unit_price, unit_count, amount, sub_total_amount, net_amount, paid_amount, due_amount,
                        due_receivable_date, land_type, land_size, start_date_time, end_date_time, rent_hour,
                        version, user_join_date, user_region, customer_join_date, currency_exchange_rate
                    FROM gds_database.gds_machine_rent_transactions
                ) machine_rent;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df

In [6]:
def transform_machine_rent(df):
    """
    transform machine_rent dataframe and returns df
    :param df: actual machine_rent dataframe
    :return df: transformed dataframe
    """
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)

    # convert date_of_transaction to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["customer_id"] = df["customer_id"].astype(str)
    df["customer_mobile"] = df["customer_mobile"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["amount"] = df["amount"].astype(float)
    df["net_amount"] = df["net_amount"].astype(float)
    df["due_amount"] = df["due_amount"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)
    
    # renaming columns for consistency
    df.rename(columns={"net_amount" : "revenue",
        "amount": "product_amount"}, inplace=True)

    # add market_type column
    df["market_type"] = "Farmer"
    df["transaction_category"] = "Machinery Rental"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category", "product",
            "version"]) \
        .drop_duplicates(subset=["transaction_id", "category", "product"], keep="last")

    return df

### 3. Advisory Service
Advisory data are stored in `gds_advisory_transactions` table. For financial analysis such as revenue and profit, `amount` is considered.

In [7]:
def extract_advisory(engine):
    """
    read advisory table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, business_categories,
                    categories, transaction_date, transaction_id, customer_id, customer_name, customer_mobile,
                        customer_gender, amount, version, currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, user_join_date, user_region,
                        parent_name, transaction_id, transaction_date, customer_id, customer_name, customer_mobile,
                        customer_gender, customer_join_date, categories_ids, business_categories, categories,
                        tags_ids, tags, comments, amount, usd_amount, version, currency_exchange_rate
                    FROM gds_database.gds_advisory_transactions
                ) advisory;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df

In [8]:
def transform_advisory(df):
    """
    transform advisory dataframe and returns df
    :param df: actual machine_rent dataframe
    :return df: transformed dataframe
    """
    
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)

    # convert date_of_transaction to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["customer_id"] = df["customer_id"].astype(str)
    df["customer_mobile"] = df["customer_mobile"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["amount"] = df["amount"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)
    
    # renaming columns for consistency
    df.rename(columns={"amount" : "revenue"}, inplace=True)

    # add market_type column
    df["market_type"] = "Farmer"
    df["transaction_category"] = "Advisory"
    df["business_category"] = "Advisory"
    df["category"] = "Advisory"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category",
        "version"]) \
        .drop_duplicates(subset=["transaction_id", "category"], keep="last")

    return df

### 4. Purchase
Purchase data are stored in `gds_purchase_transactions` table.

In [9]:
def extract_purchase(engine):
    """
    read purchase table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, market_type,
                    business_category, category, product, transaction_date, transaction_id, supplier_id, supplier_name,
                    supplier_mobile, supplier_gender, product_amount, net_amount, due_amount, version,
                    currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, supplier_id, supplier_name, supplier_mobile, supplier_gender, market_type,
                        business_category, category_id, category, product_id, product, unit_type, attribute, quantity,
                        unit_price, product_amount, sub_total_amount, commission_amount, net_amount, paid_amount,
                        due_amount, due_payable_date, version, user_join_date, user_region, supplier_join_date,
                        currency_exchange_rate
                    FROM gds_database.gds_purchase_transactions
                ) purchase;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s", level=logging.ERROR)
        logging.error(e)
        
    return df

In [10]:
def transform_purchase(df):
    """
    transform purchase dataframe and returns df
    :param df: actual purchase dataframe
    :return df: transformed dataframe
    """
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)
    
    # convert transaction_date to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["supplier_id"] = df["supplier_id"].astype(str)
    df["supplier_mobile"] = df["supplier_mobile"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["net_amount"] = df["net_amount"].astype(float)
    df["due_amount"] = df["due_amount"].astype(float)   
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)

    # renaming columns for consistency
    df.rename(columns={"net_amount" : "expenditure",
        "product_amount" : "product_expenditure"}, inplace=True)

    # add market_type column
    df["transaction_category"] = "Purchase"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category", "product",
            "version"]) \
        .drop_duplicates(subset=["transaction_id", "category", "product"], keep="last")

    return df

### 5. Processing
Processing data are stored in `gds_processing_transactions` table.

In [11]:
def extract_processing(engine):
    """
    read processing table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, business_category,
                    category, product, transaction_date, transaction_id, amount, production_cost, version,
                    currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, business_category, category_id, category, product_id, product, unit_type,
                        quantity, unit_price, amount, production_cost, version, user_join_date, user_region,
                        currency_exchange_rate
                    FROM gds_database.gds_processing_transactions
                ) processing;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s", level=logging.ERROR)
        logging.error(e)
        
    return df

In [12]:
def transform_processing(df):
    """
    transform processing dataframe and returns df
    :param df: actual processing dataframe
    :return df: transformed dataframe
    """
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)
    
    # convert transaction_date to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["amount"] = df["amount"].astype(float)
    df["production_cost"] = df["production_cost"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)

    # renaming columns for consistency
    df.rename(columns={"production_cost" : "expenditure",
        "amount" : "product_expenditure"}, inplace=True)

    # add transaction category column
    df["market_type"] = df["user_type"]
    df["transaction_category"] = "Processing"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category", "product",
            "version"]) \
        .drop_duplicates(subset=["transaction_id", "category", "product"], keep="last")

    return df

### 6. Machine Purchase
Machine Purchase data are stored in `gds_machine_purchase_transactions` table.

In [13]:
def extract_machine_purchase(engine):
    """
    read machine_purchase table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, business_category,
                    category, product, transaction_date, transaction_id, supplier_id, supplier_name,
                    supplier_mobile, supplier_gender, total_amount, due_amount, version,
                    currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, supplier_id, supplier_name, supplier_mobile, supplier_gender,
                        business_category, category_id, category, product_id, product, quantity, unit_price,
                        total_amount, paid_amount, due_amount, due_payable_date, version, user_join_date, user_region,
                        supplier_join_date, currency_exchange_rate
                    FROM gds_database.gds_machine_purchase_transactions
                ) machine_purchase;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df

In [14]:
def transform_machine_purchase(df):
    """
    transform machine_purchase dataframe and returns df
    :param df: actual machine_purchase dataframe
    :return df: transformed dataframe
    """
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)
    
    # convert transaction_date to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["supplier_id"] = df["supplier_id"].astype(str)
    df["supplier_mobile"] = df["supplier_mobile"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["total_amount"] = df["total_amount"].astype(float)
    df["due_amount"] = df["due_amount"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)    

    # usd conversion
    df.rename(columns={"total_amount" : "expenditure"}, inplace=True)

    # add transaction category column
    df["market_type"] = df["user_type"]
    df["transaction_category"] = "Machinery Purchase"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "category", "product",
            "version"]) \
        .drop_duplicates(subset=["transaction_id"], keep="last")

    return df

### 7. Expense
Expense data are stored in `gds_expense_transactions` table.

In [15]:
def extract_expense(engine):
    """
    read expense table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT country_name, parent_name, user_type, user_region, user_name, user_id, expense_category,
                    business_category, product_category, expense_type, transaction_date, transaction_id, total_amount,
                    version, currency_exchange_rate
                FROM (
                    SELECT distinct user_id, country_name, user_name, user_type, parent_name, transaction_id,
                        transaction_date, expense_category, expense_type, total_amount, business_category,
                        product_category_id, product_category, version, user_join_date, user_region,
                        currency_exchange_rate
                    FROM gds_database.gds_expense_transactions
                ) expense;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df

In [16]:
def transform_expense(df):
    """
    transform expense dataframe and returns df
    :param df: actual expense dataframe
    :return df: transformed dataframe
    """
    # drop duplicates
    df.drop_duplicates(inplace=True, ignore_index=True)
    
    # convert transaction_date to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    # convert user_id to string
    df["user_id"] = df["user_id"].astype(str)
    df["transaction_id"] = df["transaction_id"].astype(str)

    # convert and round numerical columns
    df["total_amount"] = df["total_amount"].astype(float)
    df["currency_exchange_rate"] = df["currency_exchange_rate"].astype(float)

    # revenue & profit
    df.rename(columns={"total_amount" : "expenditure",
        "product_category" : "category"}, inplace=True)

    # add transaction category column
    df["market_type"] = df["user_type"]
    df["transaction_category"] = "Expense"

    # sorting data based on version and keep the latest version only
    df = df.sort_values(["country_name", "parent_name", "user_id", "transaction_id", "expense_category",
            "category", "expense_type", "version"]) \
        .drop_duplicates(subset=["transaction_id", "expense_category", "category", "expense_type"],
            keep="last")

    return df

### 8. Data Denormalization
Concatanate all the transactional dataframe. 

In [17]:
def data_denormalize(sale, machine_rent, advisory, purchase, machine_purchase, processing, expense):
    """
    denormalize all the transactions i.e. concatanate or compile all the transactional tables into one
    master table
    :param sale: sale dataframe
    :param machine_rent: machine_rent dataframe
    :param advisory: advisory dataframe
    :param purchase: purchase dataframe
    :param machine_purchase: machine_purchase dataframe
    :param processing: processing dataframe
    :param expense: expense dataframe
    :return df: return master data
    """
    # concat all cash-in transactions
    income = pd.concat([sale, machine_rent, advisory], sort=False, ignore_index=True)
    # usd conversion
    income["product_amount_usd"] = round(income["product_amount"] / income["currency_exchange_rate"], 4)
    income["revenue_usd"] = round(income["revenue"] / income["currency_exchange_rate"], 4)
    income["cogs_amount_usd"] = round(income["cogs_amount"] / income["currency_exchange_rate"], 4)
    # data cleaning for outliers
    income_anomaly = income.loc[(income["revenue_usd"] > 10000) & (income["cogs_amount_usd"] > 10000)] 
    income = income.loc[(income["revenue_usd"] <= 10000) & (income["cogs_amount_usd"] <= 10000)]

    # concat all cash-out transactions
    expenditure = pd.concat([purchase, machine_purchase, processing], sort=False, ignore_index=True)
    # usd conversion
    expenditure["product_expenditure_usd"] = round(expenditure["product_expenditure"] / 
        expenditure["currency_exchange_rate"], 4)
    expenditure["expenditure_usd"] = round(expenditure["expenditure"] / expenditure["currency_exchange_rate"], 4)
    # data cleaning for outliers
    expenditure_anomaly = expenditure.loc[(expenditure["expenditure_usd"] > 10000)]
    expenditure = expenditure.loc[expenditure["expenditure_usd"] <= 10000]

    # filter direct cost
    direct_cost = expense.loc[expense["expense_category"] == "Direct Cost"]
    # usd conversion
    direct_cost["direct_cost_usd"] = round(direct_cost["expenditure"] / direct_cost["currency_exchange_rate"], 4)
    # data cleaning for outliers
    direct_cost_anomaly = direct_cost.loc[(direct_cost["direct_cost_usd"] > 10000)]
    direct_cost = direct_cost.loc[direct_cost["direct_cost_usd"] <= 10000]

    # filter indirect cost
    indirect_cost = expense.loc[expense["expense_category"] == "Indirect Cost"]
    # usd conversion
    indirect_cost["indirect_cost_usd"] = round(indirect_cost["expenditure"] / indirect_cost["currency_exchange_rate"], 4)
    # data cleaning for outliers
    indirect_cost_anomaly = indirect_cost.loc[(indirect_cost["indirect_cost_usd"] > 10000)]
    indirect_cost = indirect_cost.loc[indirect_cost["indirect_cost_usd"] <= 10000]

    # concatenate direct and indirect cost
    expense = pd.concat([direct_cost, indirect_cost], sort=False, ignore_index=True)
    expense.drop(columns=["expense_type"], inplace=True)

    df = pd.concat([income, expenditure, expense], sort=False, ignore_index=True)
    # anomaly = pd.concat([income_anomaly, expenditure_anomaly, expense_anomaly], sort=False, ignore_index=True)
    anomaly = get_anomaly(income_anomaly, expenditure_anomaly, direct_cost_anomaly, indirect_cost_anomaly)
    df.drop(columns=["business_categories", "categories"], inplace=True)
    # anomaly.drop(columns=["business_categories", "categories"], inplace=True)
    
    return df, anomaly
    # return df

In [18]:
def get_finance(df):
    """
    from master_data table generate finance table
    :param df: master_data dataframe
    :return df: finance dataframe
    """

    df = df.loc[:, ["country_name", "parent_name", "user_type", "user_region", "user_name", "user_id",
        "business_category", "category", "transaction_date", "transaction_id", "transaction_category",
        "revenue_usd", "cogs_amount_usd", "expenditure_usd", "direct_cost_usd", "indirect_cost_usd"]]

    # convert transaction_date to datetime
    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")
    
    df = df.groupby(["country_name", "user_type", "parent_name", "user_region", "user_name", "user_id",
        "transaction_category", "business_category", "category", "transaction_date"]) \
        .agg(revenue_usd=("revenue_usd", "sum"),
            cogs_amount_usd=("cogs_amount_usd", "sum"),
            expenditure_usd=("expenditure_usd", "sum"),
            direct_cost_usd=("direct_cost_usd", "sum"),
            indirect_cost_usd=("indirect_cost_usd", "sum"),
            transaction_count=("transaction_id", "nunique")).reset_index()
    
    return df

In [19]:
def get_user(df):
    """
    create user table from master_data
    :param df: master_data table
    :return df: user dataframe
    """

    df = df.loc[:, ["country_name", "parent_name", "user_type", "user_region", "user_name", "user_id",
        "transaction_date", "transaction_id", "customer_id", "customer_mobile", "customer_gender", "supplier_id",
        "supplier_mobile", "supplier_gender"]]

    df["transaction_date"] = pd.to_datetime(df["transaction_date"], format="%Y/%m/%d")

    df = df.dropna(subset=["customer_id"])

    df = df.groupby(["country_name", "parent_name", "user_type", "user_region", "user_name", "user_id",
        "transaction_date"]) \
        .agg(transaction_count=("transaction_id", "nunique"),
            mobile_count=("customer_mobile", "nunique"),
            gender_count=("customer_gender", "count"),
            customer_count=("customer_id", "nunique")).reset_index()

    return df

In [20]:
def get_anomaly(income, expenditure, direct_cost, indirect_cost):
    """
    create user table from master_data
    :param income: income anomaly df
    :param expenditure: expenditure anomaly df
    :param direct_cost: direct_cost anomaly df
    :param indirect_cost: indirect_cost anomaly df
    :return df: user dataframe
    """
    # process anomaly data of income
    income.drop(columns=["market_type", "business_category", "category", "product", "customer_id", "customer_name",
        "customer_mobile", "customer_gender", "product_amount", "revenue", "due_amount", "cogs_amount", "version",
        "currency_exchange_rate", "business_categories", "categories", "product_amount_usd", "cogs_amount_usd"],
        inplace=True)
    income.rename(columns={"revenue_usd" : "transaction_amount_usd"}, inplace=True)

    # process anomaly data of expenditure
    expenditure.drop(columns=["market_type", "business_category", "category", "product", "supplier_id", "supplier_name",
        "supplier_mobile", "supplier_gender", "product_expenditure", "expenditure", "due_amount", "version",
        "currency_exchange_rate", "product_expenditure_usd"],
        inplace=True)
    expenditure.rename(columns={"expenditure_usd" : "transaction_amount_usd"}, inplace=True)

    # process anomaly data of expense
    direct_cost.drop(columns=["market_type", "business_category", "category", "expenditure", "version",
        "currency_exchange_rate", "transaction_category", "expense_type"], inplace=True)
    direct_cost.rename(columns={"direct_cost_usd" : "transaction_amount_usd",
        "expense_category" : "transaction_category"}, inplace=True)

    indirect_cost.drop(columns=["market_type", "business_category", "category", "expenditure", "version",
        "currency_exchange_rate", "transaction_category", "expense_type"], inplace=True)
    indirect_cost.rename(columns={"indirect_cost_usd" : "transaction_amount_usd",
        "expense_category" : "transaction_category"}, inplace=True)

    df = pd.concat([income, expenditure, direct_cost, indirect_cost], sort=False, ignore_index=True)
    return df

In [None]:
def get_user_db(engine):
    """
    read expense table from sql database and returns df
    :param engine: SQLAlchemy engine object
    :return df: sale dataframe
    """
    try:
        with engine.connect() as conn:
            query = """
                SELECT *
                FROM gds_database.gds_users_information;
                """
            df = pd.read_sql(query, conn)
    except Exception as e:
        logging.basicConfig(filename="./log", filemode="a", format="%(asctime)s - %(levelname)s - %(message)s",
            level=logging.ERROR)
        logging.error(e)
        
    return df  

In [21]:
if __name__ == "__main__":
    # initiate connection to database
    connect_url = URL.create(
        "mysql+pymysql",
        username=USERNAME,
        password=PASSWORD,
        host=HOST,
        port=PORT,
        database=DATABASE
    )
    engine = create_engine(connect_url)

    # debug
    # with engine.connect() as conn:
    #     inspector = inspect(engine)
    #     table_names = inspector.get_table_names()
    #     print(table_names)

    print("compiling transactions")
    # sale
    sale = extract_sale(engine)
    sale = transform_sale(sale)
    # sale.to_csv("../output/sale.csv", index=False)

    # machine rent
    machine_rent = extract_machine_rent(engine)
    machine_rent = transform_machine_rent(machine_rent)
    # machine_rent.to_csv("../output/machine_rent.csv", index=False)

    # advisory
    advisory = extract_advisory(engine)
    advisory = transform_advisory(advisory)
    # advisory.to_csv("../output/advisory.csv", index=False)

    # purchase
    purchase = extract_purchase(engine)
    purchase = transform_purchase(purchase)
    # purchase.to_csv("../output/purchase.csv", index=False)

    # processing
    processing = extract_processing(engine)
    processing = transform_processing(processing)
    # processing.to_csv("../output/processing.csv", index=False)

    # machine purchase
    machine_purchase = extract_machine_purchase(engine)
    machine_purchase = transform_machine_purchase(machine_purchase)
    # machine_purchase.to_csv("../output/machine_purchase.csv", index=False)

    # expense
    expense = extract_expense(engine)
    expense = transform_expense(expense)
    # expense.to_csv("../output/expense.csv", index=False)

    print("denormalizing data")
    master_data, anomaly = data_denormalize(sale, machine_rent, advisory, purchase, machine_purchase, processing, expense)
    master_data.to_sql("master_data_global", con=engine, if_exists='replace', index = False)
    user_table = master_data.loc[:, ["country_name", "parent_name", "user_type", "user_region", "user_name", "user_id",
        "currency_exchange_rate"]]
    user_table = user_table.groupby(["country_name", "parent_name", "user_type", "user_region", "user_name", "user_id"]) \
        .agg(currency_exchange_rate=("currency_exchange_rate", "mean")).reset_index()
    anomaly.to_sql("anomaly_global", con=engine, if_exists='replace', index = False)

    print("create finance table")
    # financial table
    finance = get_finance(master_data)
    finance.to_sql("finance_global", con=engine, if_exists='replace', index = False)

    print("create user data")
    client = get_user(master_data)
    client.to_sql("client_global", con=engine, if_exists='replace', index = False)

compiling transactions
denormalizing data


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  direct_cost["direct_cost_usd"] = round(direct_cost["expenditure"] / direct_cost["currency_exchange_rate"], 4)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  indirect_cost["indirect_cost_usd"] = round(indirect_cost["expenditure"] / indirect_cost["currency_exchange_rate"], 4)


create finance table
create user data


In [57]:
master_user_parent = master_data[["parent_name"]]
master_user_parent.drop_duplicates(inplace=True)
master_user_parent.to_csv("parent_name.csv", index=False)

In [58]:
with engine.connect() as conn:
    query = """
        SELECT *
        FROM gds_database.gds_users_information;
        """
    parent_db = pd.read_sql(query, conn)

In [59]:
parent_db = parent_db[parent_db["user_type"] == "Franchisee"]
parent_db.rename(columns={"enterprise_name" : "parent_name"}, inplace=True)
parent_db.to_csv("parent_db.csv", index=False)

In [60]:
master_user_parent = master_user_parent.merge(parent_db[["parent_name", "user_id"]], how="inner", on="parent_name")
master_user_parent.to_csv("parent_name.csv", index=False)