In [1]:
import pandas as pd
import random
from datetime import datetime
import time
import sqlite3
import plotly.express as px

In [2]:
products = ['Laptop', 'Mouse', 'Keyboard', 'Phone', 'Tablet']


data = []
for _ in range(50):
    row = {
        'order_id': random.randint(1000, 9999),
        'customer_id': 'C' + str(random.randint(100, 999)),
        'product': random.choice(products),
        'quantity': random.randint(1, 5),
        'price': round(random.uniform(500, 20000), 2),
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }
    data.append(row)

df = pd.DataFrame(data)
df.to_csv("retail_sales_data.csv", index=False)

In [3]:
df.head(10)

Unnamed: 0,order_id,customer_id,product,quantity,price,timestamp
0,9109,C490,Mouse,2,8776.83,2025-06-14 10:54:25
1,6012,C906,Laptop,3,10106.91,2025-06-14 10:54:25
2,4468,C309,Tablet,5,12869.12,2025-06-14 10:54:25
3,3894,C117,Laptop,1,7330.34,2025-06-14 10:54:25
4,3792,C741,Keyboard,2,3332.87,2025-06-14 10:54:25
5,6322,C540,Phone,3,7129.59,2025-06-14 10:54:25
6,4510,C740,Phone,1,16448.76,2025-06-14 10:54:25
7,6316,C217,Mouse,3,4660.3,2025-06-14 10:54:25
8,7830,C358,Tablet,3,7265.77,2025-06-14 10:54:25
9,3636,C943,Mouse,4,19419.39,2025-06-14 10:54:25


In [4]:
# Check for missing values
print(df.isnull().sum())

# Create a new column for total price
df['total_amount'] = df['quantity'] * df['price']

# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')


order_id       0
customer_id    0
product        0
quantity       0
price          0
timestamp      0
dtype: int64


In [5]:
# Total sales per product
product_sales = df.groupby('product')['total_amount'].sum().reset_index()
print(product_sales)

    product  total_amount
0  Keyboard     216403.54
1    Laptop     357870.10
2     Mouse     436274.45
3     Phone     316792.40
4    Tablet     386917.08


In [6]:
# Total revenue
total_revenue = df['total_amount'].sum()
print("Total Revenue:", total_revenue)

Total Revenue: 1714257.57


In [7]:
#daily sales
df['date'] = df['timestamp'].dt.date
daily_sales = df.groupby('date')['total_amount'].sum().reset_index()
print(daily_sales)

         date  total_amount
0  2025-06-14    1714257.57


In [8]:
# Connect to a local SQLite DB
conn = sqlite3.connect("retail_data.db")

# Save cleaned data
df.to_sql("sales_data", conn, if_exists="replace", index=False)

# Verify insertion
query = "SELECT * FROM sales_data LIMIT 5"
pd.read_sql(query, conn)

Unnamed: 0,order_id,customer_id,product,quantity,price,timestamp,total_amount,date
0,9109,C490,Mouse,2,8776.83,2025-06-14 10:54:25,17553.66,2025-06-14
1,6012,C906,Laptop,3,10106.91,2025-06-14 10:54:25,30320.73,2025-06-14
2,4468,C309,Tablet,5,12869.12,2025-06-14 10:54:25,64345.6,2025-06-14
3,3894,C117,Laptop,1,7330.34,2025-06-14 10:54:25,7330.34,2025-06-14
4,3792,C741,Keyboard,2,3332.87,2025-06-14 10:54:25,6665.74,2025-06-14


In [9]:
# Bar chart: Sales per product
fig = px.bar(product_sales, x='product', y='total_amount', title='Total Sales by Product',text_auto='.2s')
fig.update_layout(width=700, height=400)
fig.show()

In [10]:
daily_sales.head()


Unnamed: 0,date,total_amount
0,2025-06-14,1714257.57


In [11]:
# Line chart: Daily Sales Trend
fig2 = px.line(daily_sales, x='date', y='total_amount', title='Daily Revenue Trend',markers=True)
fig2.update_layout(width=700, height=400)
fig2.show()

In [12]:
import time

def simulate_real_time_feed(file_path='real_time_sales.csv'):
    from random import choice, randint, uniform
    from datetime import datetime

    products = ['Laptop', 'Mouse', 'Keyboard', 'Phone']
    for _ in range(5):
        new_data = {
            'order_id': randint(10000, 99999),
            'customer_id': 'C' + str(randint(100, 999)),
            'product': choice(products),
            'quantity': randint(1, 3),
            'price': round(uniform(1000, 50000), 2),
            'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }

        df = pd.DataFrame([new_data])
        df.to_csv(file_path, mode='a', header=not pd.io.common.file_exists(file_path), index=False)

        print("New sale recorded:", new_data)
        time.sleep(5)

simulate_real_time_feed()


New sale recorded: {'order_id': 30471, 'customer_id': 'C496', 'product': 'Mouse', 'quantity': 2, 'price': 40597.53, 'timestamp': '2025-06-14 10:54:26'}
New sale recorded: {'order_id': 72158, 'customer_id': 'C222', 'product': 'Keyboard', 'quantity': 3, 'price': 10792.05, 'timestamp': '2025-06-14 10:54:31'}
New sale recorded: {'order_id': 62924, 'customer_id': 'C845', 'product': 'Laptop', 'quantity': 2, 'price': 7823.81, 'timestamp': '2025-06-14 10:54:36'}
New sale recorded: {'order_id': 64504, 'customer_id': 'C394', 'product': 'Laptop', 'quantity': 3, 'price': 13156.55, 'timestamp': '2025-06-14 10:54:41'}
New sale recorded: {'order_id': 47196, 'customer_id': 'C669', 'product': 'Phone', 'quantity': 3, 'price': 17714.43, 'timestamp': '2025-06-14 10:54:46'}


In [13]:
def etl_pipeline(source_file='real_time_sales.csv', db_name='retail_data.db'):
    # Step 1: Load
    df_new = pd.read_csv(source_file)

    # Step 2: Clean
    df_new['timestamp'] = pd.to_datetime(df_new['timestamp'])
    df_new['total_amount'] = df_new['quantity'] * df_new['price']
    df_new['date'] = df_new['timestamp'].dt.date

    # Step 3: Load to database
    conn = sqlite3.connect(db_name)
    df_new.to_sql('sales_data', conn, if_exists='append', index=False)

    print(f"✔ Loaded {len(df_new)} new rows into database.")

etl_pipeline()


✔ Loaded 15 new rows into database.


In [14]:
# Top 3 products
top_products = df.groupby('product')['total_amount'].sum().nlargest(3).reset_index()

fig3 = px.pie(top_products, names='product', values='total_amount', title="Top 3 Products by Revenue")
fig3.update_layout(width=700, height=400)
fig3.show()


In [15]:
# Extract hour from timestamp
df['hour'] = df['timestamp'].dt.hour

# Revenue per hour
hourly_sales = df.groupby('hour')['total_amount'].sum().reset_index()

# Plot
import plotly.express as px
fig4 = px.line(hourly_sales, x='hour', y='total_amount', markers=True,
              title='Revenue Trend by Hour of Day')
fig4.update_layout(width=700, height=400)
fig4.show()


In [16]:
# Check for abnormal transactions
abnormal = df[(df['quantity'] > 10) | (df['price'] < 0)]
print("Abnormal entries:")
print(abnormal)


Abnormal entries:
Empty DataFrame
Columns: [order_id, customer_id, product, quantity, price, timestamp, total_amount, date, hour]
Index: []


In [17]:
abnormal.to_csv("abnormal_sales.csv", index=False)


In [18]:
import logging

# Configure logging
logging.basicConfig(filename='etl_log.txt', level=logging.INFO, format='%(asctime)s %(message)s')

def etl_pipeline_log(source_file='real_time_sales.csv', db_name='retail_data.db'):
    try:
        df_new = pd.read_csv(source_file)
        df_new['timestamp'] = pd.to_datetime(df_new['timestamp'])
        df_new['total_amount'] = df_new['quantity'] * df_new['price']
        df_new['date'] = df_new['timestamp'].dt.date

        conn = sqlite3.connect(db_name)
        df_new.to_sql('sales_data', conn, if_exists='append', index=False)

        logging.info(f"Loaded {len(df_new)} new rows from {source_file}")
        print(f"✔ ETL successful with {len(df_new)} rows.")
    except Exception as e:
        logging.error(f"ETL failed: {e}")
        print("❌ ETL failed. Check log.")

etl_pipeline_log()


✔ ETL successful with 15 rows.


In [19]:
product_sales.to_csv("total_sales_by_product.csv", index=False)
daily_sales.to_csv("daily_sales.csv", index=False)
hourly_sales.to_csv("hourly_sales.csv", index=False)
