In [1]:
import sys
sys.path.append('/path/to/utils')

In [2]:
import pandas as pd
from zipfile import ZipFile
import os
import requests
import numpy as np
from data_ingestion import DataIngestion
from datetime import datetime

In [3]:
## extracting zipfile
path_to_zipfile = '/path/to/xtage_task_data/Xtage - Assignment - Data Engineer - 2-4 Years - Mumbai.zip'
path_to_extractfile = '/path/to/xtage_task_data/'
with ZipFile(path_to_zipfile,'r') as zp:
    zp.extractall(path=path_to_extractfile)


# Data Ingestion

## create new database's to insert raw files & standardized data

In [4]:
# Database credentials
db_credentials_create = {
    'host': 'rds-endpoint',
    'port': 5432,
    'database': 'postgres',
    'user': 'postgres',
    'password': '******'
}


In [6]:
# Initialize DataIngestion class
db_ing = DataIngestion(db_credentials_create)

Database engine created successfully


In [7]:
db_ing.create_database("xtage_raw")  # to store raw files

Database 'xtage_raw' created successfully.


In [37]:
db_ing.create_database("xtage_std") # to store standardized files

Database 'xtage_std' created successfully.


## insert raw data file into database - these act as one source of data

In [9]:
# Database credentials
db_credentials = {
    'host': 'rds-endpoint',
    'port': 5432,
    'database': 'xtage_raw',
    'user': 'postgres',
    'password': '******'
}
# Initialize DataIngestion class
ingestion = DataIngestion(db_credentials_create)

Database engine created successfully


In [10]:
# SQL commands to create tables
create_products_table_sql = """
CREATE TABLE IF NOT EXISTS products (
    Product_ID INT PRIMARY KEY,
    Product_Name VARCHAR(255),
    Category VARCHAR(255),
    Price FLOAT,
    Stock_Available INT
);
"""

create_transactions_table_sql = """
CREATE TABLE IF NOT EXISTS transactions (
    Transaction_ID INT PRIMARY KEY,
    Customer_ID INT,
    Product_ID INT,
    Quantity INT,
    Transaction_Date DATE,
    Total_Amount FLOAT
);
"""

In [11]:
# Create tables
ingestion.execute_query(create_products_table_sql)
ingestion.execute_query(create_transactions_table_sql)

Query executed successfully.
Query executed successfully.


In [12]:
# Insert data from CSV files
ingestion.insert_data_from_csv('/path/to/xtage_task_data/products.csv', 'products')
ingestion.insert_data_from_csv('/path/to/xtage_task_data/transactions.csv', 'transactions')

Data from /home/ec2-user/xtage_task_data/products.csv inserted into products successfully
Data from /home/ec2-user/xtage_task_data/transactions.csv inserted into transactions successfully


In [14]:
# Read data from a CSV without inserting into the database
sales_data = ingestion.read_csv('/path/to/xtage_task_data/sales_data.csv')

# API headers (if needed for authentication)
api_headers = {'x-api-key': 'PMAK-66d037574729490001e0b412-8b7bb142191ce35e4f8287a52bfc9b709f',
                'Content-Type': 'application/json',
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36'
                }
# Fetch data from external and internal APIs
exchange_rates = ingestion.fetch_data_from_api('https://ca746e27-0780-4511-a49c-e2ed1604cb41.mock.pstmn.io/api/exchange_rates',api_headers)
exchange_rates_df = pd.DataFrame(exchange_rates)

customer_data = ingestion.fetch_data_from_api('https://ca746e27-0780-4511-a49c-e2ed1604cb41.mock.pstmn.io/api/internal/customer_data',api_headers)
customer_data_df = pd.DataFrame(customer_data)

products_df = ingestion.fetch_query_results('select * from products')

transactions_df = ingestion.fetch_query_results('select * from transactions')

CSV data from /home/ec2-user/xtage_task_data/sales_data.csv read successfully
Data fetched from API https://ca746e27-0780-4511-a49c-e2ed1604cb41.mock.pstmn.io/api/exchange_rates successfully
Data fetched from API https://ca746e27-0780-4511-a49c-e2ed1604cb41.mock.pstmn.io/api/internal/customer_data successfully


# Data Standardisation

In [15]:
sales_data.columns = [c.lower() for c in sales_data.columns]
exchange_rates_df.columns = [c.lower() for c in exchange_rates_df.columns]
customer_data_df.columns = [c.lower() for c in customer_data_df.columns]

In [17]:
for i in customer_data_df.columns:
    if customer_data_df[i].dtype=='O':
        customer_data_df[i] = customer_data_df[i].apply(lambda x: np.nan if str(x).strip()=='' else x )

In [20]:
for i in products_df.columns:
    if products_df[i].dtype=='O':
        products_df[i] = products_df[i].apply(lambda x: np.nan if str(x).strip()=='' else x )

In [23]:
for i in transactions_df.columns:
    if transactions_df[i].dtype=='O':
        transactions_df[i] = transactions_df[i].apply(lambda x: np.nan if str(x).strip()=='' else x )

In [26]:
for i in sales_data.columns:
    if sales_data[i].dtype=='O':
        sales_data[i] = sales_data[i].apply(lambda x: np.nan if str(x).strip()=='' else x )

In [29]:
for i in exchange_rates_df.columns:
    if exchange_rates_df[i].dtype=='O':
        exchange_rates_df[i] = exchange_rates_df[i].apply(lambda x: np.nan if str(x).strip()=='' else x )

In [31]:
def standardize_date(date_str):
    """
    Convert date string to YYYY-MM-DD format.
    
    Args:
        date_str (str): Original date string.
    
    Returns:
        str: Standardized date string in YYYY-MM-DD format.
    """
    for fmt in ("%Y-%m-%d", "%d/%m/%Y", "%m-%d-%Y"):
        try:
            return datetime.strptime(date_str, fmt).date()
        except ValueError:
            continue
    return None

In [32]:
customer_data_df['date_joined'] = customer_data_df['date_joined'].apply(lambda x: standardize_date(x) if str(x)!='nan' else x)
exchange_rates_df['date'] = exchange_rates_df['date'].apply(lambda x: standardize_date(x) if str(x)!='nan' else x)
sales_data['transaction_date'] = sales_data['transaction_date'].apply(lambda x: standardize_date(x) if str(x)!='nan' else x)


In [38]:
# Database credentials
db_credentials = {
    'host': 'rds-endpoint',
    'port': 5432,
    'database': 'xtage_std',
    'user': 'postgres',
    'password': '******'
}

# Initialize DataIngestion class
xtp_con = DataIngestion(db_credentials)

Database engine created successfully


In [39]:
create_customers_std_table_sql = """
CREATE TABLE IF NOT EXISTS customers (
    customer_id INT PRIMARY KEY,
    customer_name VARCHAR(255) NOT NULL,
    age INT,
    gender VARCHAR(50),
    location VARCHAR(255),
    date_joined DATE
);
"""

create_products_std_table_sql = """
CREATE TABLE IF NOT EXISTS products (
    product_id INT PRIMARY KEY,
    product_name VARCHAR(255) NOT NULL,
    category VARCHAR(255) NOT NULL,
    price FLOAT,
    stock_available INT
);
"""

create_transactions_std_table_sql = """
CREATE TABLE IF NOT EXISTS transactions (
    transaction_id INT PRIMARY KEY,
    customer_id INT NOT NULL,
    product_id INT NOT NULL,
    quantity INT,
    transaction_date DATE,
    total_amount FLOAT
);
"""

create_sales_std_table_sql = """
CREATE TABLE IF NOT EXISTS sales_data (
    transaction_id INT PRIMARY KEY,
    product_id INT NOT NULL,
    quantity INT,
    price FLOAT,
    transaction_date DATE
);
"""
    
create_exchng_std_table_sql = """
CREATE TABLE IF NOT EXISTS exchange_rates (
    currency_code VARCHAR(3) NOT NULL,
    exchange_rate FLOAT,
    date DATE
);
"""

In [40]:
# Create tables
xtp_con.execute_query(create_sales_std_table_sql)
xtp_con.execute_query(create_exchng_std_table_sql)
xtp_con.execute_query(create_customers_std_table_sql)
xtp_con.execute_query(create_products_std_table_sql)
xtp_con.execute_query(create_transactions_std_table_sql)

Query executed successfully.
Query executed successfully.
Query executed successfully.
Query executed successfully.
Query executed successfully.


In [41]:
xtp_con.insert_data_from_df(df=customer_data_df,table_name='customers')
xtp_con.insert_data_from_df(df=sales_data,table_name='sales_data')
xtp_con.insert_data_from_df(df=exchange_rates_df,table_name='exchange_rates')
xtp_con.insert_data_from_df(df=products_df,table_name='products')
xtp_con.insert_data_from_df(df=transactions_df,table_name='transactions')

Data from DataFrame inserted into customers successfully.
Data from DataFrame inserted into sales_data successfully.
Data from DataFrame inserted into exchange_rates successfully.
Data from DataFrame inserted into products successfully.
Data from DataFrame inserted into transactions successfully.


>To Export
>> Command to export database in .sql format ---- pg_dump -U postgres -p 5432 -d xtage_std -W -f '/path/to/store/xtage_std.sql'

> To connect - In Local
>> 1) install postgres,   
> 2) create role & password,  
> 3) create db and grant permission to a role,  
> 4) psql -U postgres -d xtage_std -h localhost -p 5432 -f '/path/to/read/xtage_std.sql'.  

> To upload .sql file as database use beloww commands make sure your-database is created already .   
>> psql -h your-rds-endpoint -U your-username -d your-database -f xtage_std.sql   
>> psql -h your-rds-endpoint -U your-username -d your-database -f /path/to/xtage_std.sql

>To connect to aws rds db instance using cli
>> psql --host=your-rds-endpoint --port=5432 --username=postgres --password --dbname=your-db-name 