In [121]:
## import necessary modules
import csv 
from datetime import datetime

###----- functions for data extraction from CSV files ------###
def readfile(file_path): ### function to read the csv file and return all values as list of dictionaries
    with open (file_path,"r") as file:
        reader = csv.DictReader(file)
        return [row for row in reader]

###------- functions for data transformation -------###
### function to clean sales data
def clean_sales (sales):

    seen = set() 
    cleaned_data = []
    for sale in sales:
        sale_id = sale['sale_id']
        if sale_id not in seen and all(sale.values()): ## check for duplicates and empty values
            seen.add(sale_id)
            cleaned_data.append(sale)
    return cleaned_data

## function to integrate sales,product and customer data
def integrate_data(sales, products, customers):

    ## lookup index for product and customer
    product_lookup = {product['product_id']:product for product in products}
    customer_lookup = {customer['customer_id']:customer for customer in customers}

    ## joining all tables
    joined_data = [] ## empty list to store data

    for sale in sales:
        product = product_lookup.get(sale['product_id'])
        customer = customer_lookup.get(sale['customer_id'])
        if product and customer: ## condition: if both product and customer id exists across all tables 
            joined_data.append({
                "sale_id":sale['sale_id'],
                "date": sale['date'],
                "customer_id": sale['customer_id'],
                "customer_name": customer['name'],
                "product_id": sale['product_id'],
                "product_name": product['product_name'],
                "category": product['category'],
                "quantity": int(sale['quantity']),
                "price" : float(sale['price']),
                
                ### calculating derived fields
                "total_amount": round(int(sale['quantity'])*float(sale['price']),2),
                "profit_margin": round(int(sale['quantity'])*float(sale['price']) - int(sale['quantity'])*float(product['cost_price']),2),
                ### extracting date fields
                "day": datetime.strptime(sale['date'],'%d/%m/%Y').day,
                "month": datetime.strptime(sale['date'],'%d/%m/%Y').month,
                "year": datetime.strptime(sale['date'],'%d/%m/%Y').year,
                "quarter" : (datetime.strptime(sale['date'],'%d/%m/%Y').month -1) // 3 +1,
            })
    return joined_data


###------- functions for data analysis -------###
### a. total sales for all product categories
def calc_category_sales(sales):
    category_sales = {}
    for sale in sales:
        category = sale['category']
        amount = float(sale['total_amount'])
        category_sales[category] = round(category_sales.get(category,0) + amount, 2)
    return category_sales
    
 #### b. profits for each category   
def calc_category_profit(sales):
    category_sales = {}
    for sale in sales:
        category = sale['category']
        amount = float(sale['profit_margin'])
        category_sales[category] = round(category_sales.get(category,0) + amount, 2)
    return category_sales
    
### c. identify top selling products
def find_top_product(sales,n):
    product_sales = {}
    for sale in sales:
        product = sale['product_name']
        amount = float(sale['total_amount'])
        product_sales[product] = round(category_sales.get(product,0) + amount, 2)
        sorted_product_sales = dict(sorted(product_sales.items(), key = lambda x:x[1], reverse = True)[:n])
    return sorted_product_sales

### d. identify top customers by total sales amount
def find_top_customer(sales,n):
    customer_sales = {}
    for sale in sales:
        customer = sale['customer_name']
        amount = float(sale['total_amount'])
        customer_sales[customer] = round(category_sales.get(customer,0) + amount, 2)
        sorted_customer_sales = dict(sorted(customer_sales.items(), key = lambda x:x[1], reverse = True)[:n])
    return sorted_customer_sales

### e. calculate monthly sales
def calc_monthly_sales(sales):
    monthly_sales = {}
    for sale in sales:
        month = sale['month']
        amount = float(sale['total_amount'])
        monthly_sales[month] = round(monthly_sales.get(month,0) + amount, 2)
    return monthly_sales
    
### e. calculate yearly sales
def calc_yearly_sales(sales):
    yearly_sales = {}
    for sale in sales:
        year = sale['year']
        amount = float(sale['total_amount'])
        yearly_sales[year] = round(yearly_sales.get(year,0) + amount, 2)
    return yearly_sales

### e. calculate quarterly sales
def calc_quarterly_sales(sales):
    Q_sales = {}
    for sale in sales:
        Q = sale['quarter']
        amount = float(sale['total_amount'])
        Q_sales[Q] = round(Q_sales.get(Q,0) + amount, 2)
    return Q_sales

#######------- ETL process -------------#######

###  Extraction phase: 
### 1. Open and read all files 
salesdata= readfile("sales_csv.csv")
productdata = readfile("Product_csv.csv")
customerdata = readfile("Customer_csv.csv")

## Tranfromation phase:
##2. clean sales data
cleaned_salesdata = clean_sales(salesdata)
##print(cleaned_salesdata) #checkpoint

## 3. integrate data
processed_data = integrate_data(cleaned_salesdata, productdata, customerdata)
#print (processed_data) #checkpoint

## 4. Data analysis
category_sales = calc_category_sales(processed_data)
category_profit = calc_category_profit(processed_data)
top_products = find_top_product(processed_data,2)
top_customers = find_top_customer(processed_data,5)
monthly_sales = calc_monthly_sales(processed_data)
yearly_sales = calc_yearly_sales(processed_data)
quarterly_sales = calc_quarterly_sales(processed_data)

## Data loading
## 5. comprehensive csv file with processed data

with open('processed_data.csv','w', newline = '') as file:
    header = list(processed_data[0].keys()) ## extract all keys as headers
    writer = csv.DictWriter(file, fieldnames = header)
    writer.writeheader()
    writer.writerows(processed_data)
    
## 6 Generate a summary report with all sales aggregates

title = "Sales Summary Report 2024-25"
line_1 = f"The total sales for each category is: {category_sales}."
line_2 = f"The top 3 selling categories are: {top_products}."
line_3 = f"The profits for each category is: {category_profit}."
line_4 = f"The monthly sales are as follows: {monthly_sales}."
line_5 = f"The quarterly sales are as follows: {quarterly_sales}."
report = [title, line_1, line_2, line_3, line_4, line_5]

#print('\n'.join(report)) checkpoint 

with open('Summary.txt','w') as f:
    f.writelines('\n'.join(report))
