In [111]:
def read_sales_data(file_path):
    """
    Reads raw sales data from a pipe-delimited text file
    and returns a list of raw lines.
    """
    try:
        with open(file_path, 'r', encoding='latin-1') as file:
            lines = file.readlines()
        return lines
    except FileNotFoundError:
        print("File not found. Please check the path.")
        return []


In [112]:
file_path = r"C:\Users\Sneha\Downloads\sales_data.txt"

raw_lines = read_sales_data(file_path)

print("Total lines read:", len(raw_lines))
print("\nFirst 3 lines:")
for line in raw_lines[:3]:
    print(line)




Total lines read: 82

First 3 lines:
TransactionID|Date|ProductID|ProductName|Quantity|UnitPrice|CustomerID|Region

T018|2024-12-29|P107|USB Cable|8|173|C009|South

T063|2024-12-07|P110|Laptop Charger|6|1,916|C022|East



In [113]:
def parse_sales_data(raw_lines):
    parsed_rows = []

    for line in raw_lines:
        line = line.strip()

        if not line:
            continue

        if line.startswith("TransactionID"):
            continue

        
        fields = line.split("|")

        parsed_rows.append(fields)

    return parsed_rows


In [114]:
parsed_data = parse_sales_data(raw_lines)

print(parsed_data[0])


['T018', '2024-12-29', 'P107', 'USB Cable', '8', '173', 'C009', 'South']


In [115]:
def clean_sales_data(parsed_rows):
    valid_records = []
    invalid_count = 0

    for row in parsed_rows:

    
        if len(row) != 8:
            invalid_count += 1
            continue

        transaction_id, date, product_id, product_name, quantity, unit_price, customer_id, region = row

        
        if not transaction_id.startswith("T"):
            invalid_count += 1
            continue

        
        if not customer_id.strip() or not region.strip():
            invalid_count += 1
            continue

        
        product_name = product_name.replace(",", "")

        
        try:
            quantity = int(quantity.replace(",", ""))
            if quantity <= 0:
                invalid_count += 1
                continue
        except:
            invalid_count += 1
            continue

        
        try:
            unit_price = float(unit_price.replace(",", ""))
            if unit_price <= 0:
                invalid_count += 1
                continue
        except:
            invalid_count += 1
            continue

        valid_records.append([
            transaction_id,
            date,
            product_id,
            product_name,
            quantity,
            unit_price,
            customer_id,
            region
        ])

    return valid_records, invalid_count


In [116]:
cleaned_data, invalid_records = clean_sales_data(parsed_data)

total_parsed = len(parsed_data)
valid_records = len(cleaned_data)

print(f"Total records parsed: {total_parsed}")
print(f"Invalid records removed: {invalid_records}")
print(f"Valid records after cleaning: {valid_records}")


Total records parsed: 80
Invalid records removed: 10
Valid records after cleaning: 70


In [None]:
#Q2 part 1

In [None]:
#1.1
def read_sales_data(filename):
    """
    Reads sales data from file handling encoding issues
    Returns list of raw transaction lines (strings)
    """
    encodings = ['utf-8', 'latin-1', 'cp1252']

    for encoding in encodings:
        try:
            with open(filename, 'r', encoding=encoding) as file:
                lines = file.readlines()

            
            cleaned_lines = []
            for line in lines:
                line = line.strip()
                if not line:
                    continue
                if line.startswith("TransactionID"):
                    continue
                cleaned_lines.append(line)

            return cleaned_lines

        except UnicodeDecodeError:
            continue

        except FileNotFoundError:
            print(f"Error: File '{filename}' not found.")
            return []

    print("Error: Unable to read file with supported encodings.")
    return []


In [None]:
#1.1
raw_lines = read_sales_data(r"C:\Users\Sneha\Downloads\sales_data.txt")

print("Number of transaction lines:", len(raw_lines))
print("First transaction:")
print(raw_lines[0])


Number of transaction lines: 80
First transaction:
T018|2024-12-29|P107|USB Cable|8|173|C009|South


In [None]:
#1.2
def parse_transactions(raw_lines):
    """
    Parses raw lines into clean list of dictionaries
    """
    transactions = []

    for line in raw_lines:
        fields = line.split("|")

        if len(fields) != 8:
            continue

        transaction_id, date, product_id, product_name, quantity, unit_price, customer_id, region = fields

        product_name = product_name.replace(",", "")

        try:
            quantity = int(quantity.replace(",", ""))
            unit_price = float(unit_price.replace(",", ""))
        except ValueError:
            continue

        transaction = {
            'TransactionID': transaction_id,
            'Date': date,
            'ProductID': product_id,
            'ProductName': product_name,
            'Quantity': quantity,
            'UnitPrice': unit_price,
            'CustomerID': customer_id,
            'Region': region
        }

        transactions.append(transaction)

    return transactions


In [None]:
#1.2
transactions = parse_transactions(raw_lines)

print("Parsed transactions:", len(transactions))
print("Sample transaction:")
print(transactions[0])


Parsed transactions: 80
Sample transaction:
{'TransactionID': 'T018', 'Date': '2024-12-29', 'ProductID': 'P107', 'ProductName': 'USB Cable', 'Quantity': 8, 'UnitPrice': 173.0, 'CustomerID': 'C009', 'Region': 'South'}


In [None]:
#1.3
def validate_and_filter(transactions, region=None, min_amount=None, max_amount=None):
    valid_transactions = []
    invalid_count = 0

    
    for t in transactions:
        if (
            t['Quantity'] <= 0 or
            t['UnitPrice'] <= 0 or
            not t['TransactionID'].startswith("T") or
            not t['ProductID'].startswith("P") or
            not t['CustomerID'].startswith("C") or
            not t['Region'].strip()
        ):
            invalid_count += 1
            continue

        valid_transactions.append(t)

    
    regions = set(t['Region'] for t in valid_transactions)
    amounts = [t['Quantity'] * t['UnitPrice'] for t in valid_transactions]

    print("Available regions:", regions)
    print("Transaction amount range:", min(amounts), "-", max(amounts))

    
    filtered_transactions = []
    filtered_by_region = 0
    filtered_by_amount = 0

    for t in valid_transactions:
        amount = t['Quantity'] * t['UnitPrice']

        if region and t['Region'] != region:
            filtered_by_region += 1
            continue

        if min_amount and amount < min_amount:
            filtered_by_amount += 1
            continue

        if max_amount and amount > max_amount:
            filtered_by_amount += 1
            continue

        filtered_transactions.append(t)

    summary = {
        'total_input': len(transactions),
        'invalid': invalid_count,
        'filtered_by_region': filtered_by_region,
        'filtered_by_amount': filtered_by_amount,
        'final_count': len(filtered_transactions)
    }

    return filtered_transactions, invalid_count, summary


In [None]:
#1.3
valid_txns, invalid_count, summary = validate_and_filter(
    transactions,
    region="South",
    min_amount=500,
    max_amount=50000
)

print("\nSummary:")
print(summary)
print("\nSample valid transaction:")
print(valid_txns[0])


Available regions: {'North', 'West', 'East', 'South'}
Transaction amount range: 257.0 - 818960.0

Summary:
{'total_input': 80, 'invalid': 10, 'filtered_by_region': 57, 'filtered_by_amount': 2, 'final_count': 11}

Sample valid transaction:
{'TransactionID': 'T018', 'Date': '2024-12-29', 'ProductID': 'P107', 'ProductName': 'USB Cable', 'Quantity': 8, 'UnitPrice': 173.0, 'CustomerID': 'C009', 'Region': 'South'}


In [124]:
#Q3 part 2

In [125]:
#2.1a 
def calculate_total_revenue(transactions):
    
    total_revenue = 0.0

    for t in transactions:
        total_revenue += t['Quantity'] * t['UnitPrice']

    return total_revenue


In [126]:
total_revenue = calculate_total_revenue(valid_txns)

print("Total Revenue:", total_revenue)


Total Revenue: 75177.0


In [127]:
#2.1b
def region_wise_sales(transactions):
    region_stats = {}
    total_revenue = 0.0

    for t in transactions:
        region = t['Region']
        amount = t['Quantity'] * t['UnitPrice']
        total_revenue += amount

        if region not in region_stats:
            region_stats[region] = {
                'total_sales': 0.0,
                'transaction_count': 0
            }

        region_stats[region]['total_sales'] += amount
        region_stats[region]['transaction_count'] += 1

    for region in region_stats:
        region_stats[region]['percentage'] = round(
            (region_stats[region]['total_sales'] / total_revenue) * 100, 2
        )

    sorted_region_stats = dict(
        sorted(
            region_stats.items(),
            key=lambda item: item[1]['total_sales'],
            reverse=True
        )
    )

    return sorted_region_stats


# -------------------------------
# MAIN EXECUTION (TESTING FLOW)
# -------------------------------

file_path = r"C:\Users\Sneha\Downloads\sales_data.txt"

raw_lines = read_sales_data(file_path)
transactions = parse_transactions(raw_lines)


valid_txns, invalid_count, summary = validate_and_filter(transactions)

print("\nValidation Summary:")
print(summary)

# Total Revenue
total_revenue = calculate_total_revenue(valid_txns)
print("\nTotal Revenue:", total_revenue)

# Region-wise Sales
region_sales = region_wise_sales(valid_txns)
print("\nRegion-wise Sales:")
print(region_sales)

Available regions: {'North', 'West', 'East', 'South'}
Transaction amount range: 257.0 - 818960.0

Validation Summary:
{'total_input': 80, 'invalid': 10, 'filtered_by_region': 0, 'filtered_by_amount': 0, 'final_count': 70}

Total Revenue: 3527808.0

Region-wise Sales:
{'North': {'total_sales': 1321605.0, 'transaction_count': 21, 'percentage': 37.46}, 'South': {'total_sales': 889332.0, 'transaction_count': 13, 'percentage': 25.21}, 'West': {'total_sales': 848902.0, 'transaction_count': 19, 'percentage': 24.06}, 'East': {'total_sales': 467969.0, 'transaction_count': 17, 'percentage': 13.27}}


In [128]:
#2.1c
def top_selling_products(transactions, n=5):
    """
    Finds top n products by total quantity sold

    Returns: list of tuples
    (ProductName, TotalQuantity, TotalRevenue)
    """
    product_stats = {}

    for t in transactions:
        product = t['ProductName']
        quantity = t['Quantity']
        revenue = t['Quantity'] * t['UnitPrice']

        if product not in product_stats:
            product_stats[product] = {
                'total_quantity': 0,
                'total_revenue': 0.0
            }

        product_stats[product]['total_quantity'] += quantity
        product_stats[product]['total_revenue'] += revenue

    product_list = [
        (product,
         stats['total_quantity'],
         stats['total_revenue'])
        for product, stats in product_stats.items()
    ]

    product_list.sort(key=lambda x: x[1], reverse=True)

    return product_list[:n]


In [129]:
top_products = top_selling_products(valid_txns, n=5)

print(top_products)


[('Mouse', 61, 40297.0), ('Wireless Mouse', 45, 49981.0), ('Webcam', 35, 128187.0), ('USB Cable', 33, 7622.0), ('Monitor', 30, 493759.0)]


In [130]:
#2.1d
def customer_analysis(transactions):
    customer_stats = {}

    # Aggregate per customer
    for t in transactions:
        customer = t['CustomerID']
        product = t['ProductName']
        amount = t['Quantity'] * t['UnitPrice']

        if customer not in customer_stats:
            customer_stats[customer] = {
                'total_spent': 0.0,
                'purchase_count': 0,
                'products_bought': set()
            }

        customer_stats[customer]['total_spent'] += amount
        customer_stats[customer]['purchase_count'] += 1
        customer_stats[customer]['products_bought'].add(product)

    # Final calculations
    for customer in customer_stats:
        total = customer_stats[customer]['total_spent']
        count = customer_stats[customer]['purchase_count']

        customer_stats[customer]['avg_order_value'] = round(total / count, 2)
        customer_stats[customer]['products_bought'] = list(
            customer_stats[customer]['products_bought']
        )

    # Sort by total_spent descending
    sorted_customer_stats = dict(
        sorted(
            customer_stats.items(),
            key=lambda item: item[1]['total_spent'],
            reverse=True
        )
    )

    return sorted_customer_stats


In [131]:
customer_stats = customer_analysis(valid_txns)
print(customer_stats)


{'C004': {'total_spent': 857124.0, 'purchase_count': 3, 'products_bought': ['Laptop Charger', 'LaptopPremium', 'Headphones'], 'avg_order_value': 285708.0}, 'C017': {'total_spent': 762460.0, 'purchase_count': 1, 'products_bought': ['LaptopPremium'], 'avg_order_value': 762460.0}, 'C010': {'total_spent': 457186.0, 'purchase_count': 3, 'products_bought': ['External Hard Drive1TB', 'LaptopPremium', 'MouseWireless'], 'avg_order_value': 152395.33}, 'C024': {'total_spent': 249451.0, 'purchase_count': 2, 'products_bought': ['MonitorLED', 'Monitor'], 'avg_order_value': 124725.5}, 'C008': {'total_spent': 216176.0, 'purchase_count': 5, 'products_bought': ['Monitor', 'Laptop', 'Laptop Charger', 'Mouse', 'Wireless Mouse'], 'avg_order_value': 43235.2}, 'C023': {'total_spent': 165391.0, 'purchase_count': 2, 'products_bought': ['Webcam', 'Monitor'], 'avg_order_value': 82695.5}, 'C003': {'total_spent': 118144.0, 'purchase_count': 3, 'products_bought': ['Mouse', 'LaptopPremium', 'Wireless Mouse'], 'avg_o

In [132]:
#2.2a
def daily_sales_trend(transactions):
    """
    Analyzes sales trends by date

    Returns: dictionary sorted by date
    """
    daily_stats = {}

    # Group data by date
    for t in transactions:
        date = t['Date']
        amount = t['Quantity'] * t['UnitPrice']
        customer = t['CustomerID']

        if date not in daily_stats:
            daily_stats[date] = {
                'revenue': 0.0,
                'transaction_count': 0,
                'unique_customers': set()
            }

        daily_stats[date]['revenue'] += amount
        daily_stats[date]['transaction_count'] += 1
        daily_stats[date]['unique_customers'].add(customer)

    # Convert unique customers setttt to count
    for date in daily_stats:
        daily_stats[date]['unique_customers'] = len(
            daily_stats[date]['unique_customers']
        )

    # Sort by date 
    sorted_daily_stats = dict(
        sorted(daily_stats.items(), key=lambda item: item[0])
    )

    return sorted_daily_stats


In [133]:
daily_trend = daily_sales_trend(valid_txns)
print(daily_trend)


{'2024-12-01': {'revenue': 123969.0, 'transaction_count': 3, 'unique_customers': 2}, '2024-12-02': {'revenue': 882906.0, 'transaction_count': 5, 'unique_customers': 5}, '2024-12-03': {'revenue': 61851.0, 'transaction_count': 5, 'unique_customers': 5}, '2024-12-05': {'revenue': 257.0, 'transaction_count': 1, 'unique_customers': 1}, '2024-12-06': {'revenue': 34072.0, 'transaction_count': 1, 'unique_customers': 1}, '2024-12-07': {'revenue': 204912.0, 'transaction_count': 10, 'unique_customers': 7}, '2024-12-08': {'revenue': 70383.0, 'transaction_count': 3, 'unique_customers': 3}, '2024-12-09': {'revenue': 25339.0, 'transaction_count': 4, 'unique_customers': 4}, '2024-12-10': {'revenue': 1550.0, 'transaction_count': 1, 'unique_customers': 1}, '2024-12-11': {'revenue': 13207.0, 'transaction_count': 2, 'unique_customers': 2}, '2024-12-13': {'revenue': 417923.0, 'transaction_count': 3, 'unique_customers': 3}, '2024-12-14': {'revenue': 45349.0, 'transaction_count': 2, 'unique_customers': 2}, '

In [134]:
#2.2b
def find_peak_sales_day(transactions):
    """
    Identifies the date with highest revenue

    Returns: tuple (date, revenue, transaction_count)
    """
    daily_revenue = {}

    # Aggregate revenue and transaction count per date
    for t in transactions:
        date = t['Date']
        amount = t['Quantity'] * t['UnitPrice']

        if date not in daily_revenue:
            daily_revenue[date] = {
                'revenue': 0.0,
                'transaction_count': 0
            }

        daily_revenue[date]['revenue'] += amount
        daily_revenue[date]['transaction_count'] += 1

    # Finding the date with highest revenue
    peak_date = max(
        daily_revenue.items(),
        key=lambda item: item[1]['revenue']
    )

    return (
        peak_date[0],
        peak_date[1]['revenue'],
        peak_date[1]['transaction_count']
    )


In [135]:
peak_day = find_peak_sales_day(valid_txns)
print(peak_day)


('2024-12-02', 882906.0, 5)


In [136]:
#2.3a
def low_performing_products(transactions, threshold=10):
    """
    Identifies products with low sales

    Returns: list of tuples
    (ProductName, TotalQuantity, TotalRevenue)
    """
    product_stats = {}

    # Aggregate quantity and revenue by product
    for t in transactions:
        product = t['ProductName']
        quantity = t['Quantity']
        revenue = t['Quantity'] * t['UnitPrice']

        if product not in product_stats:
            product_stats[product] = {
                'total_quantity': 0,
                'total_revenue': 0.0
            }

        product_stats[product]['total_quantity'] += quantity
        product_stats[product]['total_revenue'] += revenue

    # Filter products with quantity < threshold
    low_products = [
        (product,
         stats['total_quantity'],
         stats['total_revenue'])
        for product, stats in product_stats.items()
        if stats['total_quantity'] < threshold
    ]

    # Sorting by total quantity ascending
    low_products.sort(key=lambda x: x[1])

    return low_products


In [137]:
low_products = low_performing_products(valid_txns, threshold=10)
print(low_products)


[('Laptop', 3, 184329.0), ('KeyboardMechanical', 5, 13360.0), ('WebcamHD', 6, 17862.0), ('Laptop Charger65W', 7, 19922.0), ('MouseWireless', 8, 6784.0)]


In [138]:
#Q4 part 3

In [139]:
import requests


In [140]:
#1
def fetch_all_products():
    """
    Fetches all products from DummyJSON API

    Returns:
    - list of product dictionaries
    """
    url = "https://dummyjson.com/products"

    try:
        response = requests.get(url, timeout=10)

        
        response.raise_for_status()

        data = response.json()

       
        products = data.get("products", [])

        print(f"Total products fetched: {len(products)}")
        print(f"Total products available (API): {data.get('total')}")

        return products

    except requests.exceptions.Timeout:
        print("Error: Request timed out.")
        return []

    except requests.exceptions.RequestException as e:
        print(f"Error fetching products: {e}")
        return []


In [141]:
products = fetch_all_products()


print(products[:2])


Total products fetched: 30
Total products available (API): 194
[{'id': 1, 'title': 'Essence Mascara Lash Princess', 'description': 'The Essence Mascara Lash Princess is a popular mascara known for its volumizing and lengthening effects. Achieve dramatic lashes with this long-lasting and cruelty-free formula.', 'category': 'beauty', 'price': 9.99, 'discountPercentage': 10.48, 'rating': 2.56, 'stock': 99, 'tags': ['beauty', 'mascara'], 'brand': 'Essence', 'sku': 'BEA-ESS-ESS-001', 'weight': 4, 'dimensions': {'width': 15.14, 'height': 13.08, 'depth': 22.99}, 'warrantyInformation': '1 week warranty', 'shippingInformation': 'Ships in 3-5 business days', 'availabilityStatus': 'In Stock', 'reviews': [{'rating': 3, 'comment': 'Would not recommend!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Eleanor Collins', 'reviewerEmail': 'eleanor.collins@x.dummyjson.com'}, {'rating': 4, 'comment': 'Very satisfied!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Lucas Gordon', 'reviewerEma

In [142]:
#2
def fetch_product_by_id(product_id):
    """
    Fetches a single product by ID from DummyJSON API

    Returns:
    - product dictionary
    """
    url = f"https://dummyjson.com/products/{product_id}"

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        product = response.json()
        return product

    except requests.exceptions.Timeout:
        print("Error: Request timed out.")
        return {}

    except requests.exceptions.HTTPError:
        print(f"Error: Product with ID {product_id} not found.")
        return {}

    except requests.exceptions.RequestException as e:
        print(f"Error fetching product: {e}")
        return {}


In [143]:
product = fetch_product_by_id(1)
print(product)


{'id': 1, 'title': 'Essence Mascara Lash Princess', 'description': 'The Essence Mascara Lash Princess is a popular mascara known for its volumizing and lengthening effects. Achieve dramatic lashes with this long-lasting and cruelty-free formula.', 'category': 'beauty', 'price': 9.99, 'discountPercentage': 10.48, 'rating': 2.56, 'stock': 99, 'tags': ['beauty', 'mascara'], 'brand': 'Essence', 'sku': 'BEA-ESS-ESS-001', 'weight': 4, 'dimensions': {'width': 15.14, 'height': 13.08, 'depth': 22.99}, 'warrantyInformation': '1 week warranty', 'shippingInformation': 'Ships in 3-5 business days', 'availabilityStatus': 'In Stock', 'reviews': [{'rating': 3, 'comment': 'Would not recommend!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Eleanor Collins', 'reviewerEmail': 'eleanor.collins@x.dummyjson.com'}, {'rating': 4, 'comment': 'Very satisfied!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Lucas Gordon', 'reviewerEmail': 'lucas.gordon@x.dummyjson.com'}, {'rating': 5, 'comment': '

In [144]:
#3
def fetch_products_with_limit(limit=30):
    """
    Fetches a specific number of products from DummyJSON API

    Parameters:
    - limit: number of products to fetch

    Returns:
    - list of product dictionaries
    """
    url = f"https://dummyjson.com/products?limit={limit}"

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        data = response.json()
        products = data.get("products", [])

        print(f"Requested products: {limit}")
        print(f"Products fetched: {len(products)}")

        return products

    except requests.exceptions.Timeout:
        print("Error: Request timed out.")
        return []

    except requests.exceptions.RequestException as e:
        print(f"Error fetching products: {e}")
        return []


In [145]:
products_100 = fetch_products_with_limit(100)

print(len(products_100))
print(products_100[:2])  


Requested products: 100
Products fetched: 100
100
[{'id': 1, 'title': 'Essence Mascara Lash Princess', 'description': 'The Essence Mascara Lash Princess is a popular mascara known for its volumizing and lengthening effects. Achieve dramatic lashes with this long-lasting and cruelty-free formula.', 'category': 'beauty', 'price': 9.99, 'discountPercentage': 10.48, 'rating': 2.56, 'stock': 99, 'tags': ['beauty', 'mascara'], 'brand': 'Essence', 'sku': 'BEA-ESS-ESS-001', 'weight': 4, 'dimensions': {'width': 15.14, 'height': 13.08, 'depth': 22.99}, 'warrantyInformation': '1 week warranty', 'shippingInformation': 'Ships in 3-5 business days', 'availabilityStatus': 'In Stock', 'reviews': [{'rating': 3, 'comment': 'Would not recommend!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Eleanor Collins', 'reviewerEmail': 'eleanor.collins@x.dummyjson.com'}, {'rating': 4, 'comment': 'Very satisfied!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Lucas Gordon', 'reviewerEmail': 'lucas.g

In [146]:
#4
def search_products(query):
    """
    Searches products from DummyJSON API using a query string

    Parameters:
    - query: search keyword (string)

    Returns:
    - list of matching product dictionaries
    """
    url = f"https://dummyjson.com/products/search?q={query}"

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        data = response.json()
        products = data.get("products", [])

        print(f"Search query: '{query}'")
        print(f"Products found: {len(products)}")

        return products

    except requests.exceptions.Timeout:
        print("Error: Request timed out.")
        return []

    except requests.exceptions.RequestException as e:
        print(f"Error searching products: {e}")
        return []


In [147]:
search_results = search_products("phone")

print(search_results[:2])  


Search query: 'phone'
Products found: 23
[{'id': 101, 'title': 'Apple AirPods Max Silver', 'description': 'The Apple AirPods Max in Silver are premium over-ear headphones with high-fidelity audio, adaptive EQ, and active noise cancellation. Experience immersive sound in style.', 'category': 'mobile-accessories', 'price': 549.99, 'discountPercentage': 13.67, 'rating': 3.47, 'stock': 59, 'tags': ['electronics', 'over-ear headphones'], 'brand': 'Apple', 'sku': 'MOB-APP-APP-101', 'weight': 2, 'dimensions': {'width': 24.88, 'height': 14.9, 'depth': 27.54}, 'warrantyInformation': 'No warranty', 'shippingInformation': 'Ships in 2 weeks', 'availabilityStatus': 'In Stock', 'reviews': [{'rating': 5, 'comment': 'Excellent quality!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Henry Adams', 'reviewerEmail': 'henry.adams@x.dummyjson.com'}, {'rating': 4, 'comment': 'Very happy with my purchase!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Elijah Cruz', 'reviewerEmail': 'elijah.cru

In [148]:
#3.1

In [149]:
#a
import requests

def fetch_all_products():
    """
    Fetches all products from DummyJSON API

    Returns: list of product dictionaries
    """
    url = "https://dummyjson.com/products?limit=100"

    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()

        data = response.json()
        products = data.get("products", [])

        print(f"Successfully fetched {len(products)} products.")
        return products

    except requests.exceptions.Timeout:
        print("Error: API request timed out.")
        return []

    except requests.exceptions.RequestException as e:
        print(f"Error fetching products: {e}")
        return []


In [150]:
products = fetch_all_products()

print(products[:2])   
print(len(products))  


Successfully fetched 100 products.
[{'id': 1, 'title': 'Essence Mascara Lash Princess', 'description': 'The Essence Mascara Lash Princess is a popular mascara known for its volumizing and lengthening effects. Achieve dramatic lashes with this long-lasting and cruelty-free formula.', 'category': 'beauty', 'price': 9.99, 'discountPercentage': 10.48, 'rating': 2.56, 'stock': 99, 'tags': ['beauty', 'mascara'], 'brand': 'Essence', 'sku': 'BEA-ESS-ESS-001', 'weight': 4, 'dimensions': {'width': 15.14, 'height': 13.08, 'depth': 22.99}, 'warrantyInformation': '1 week warranty', 'shippingInformation': 'Ships in 3-5 business days', 'availabilityStatus': 'In Stock', 'reviews': [{'rating': 3, 'comment': 'Would not recommend!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Eleanor Collins', 'reviewerEmail': 'eleanor.collins@x.dummyjson.com'}, {'rating': 4, 'comment': 'Very satisfied!', 'date': '2025-04-30T09:41:02.053Z', 'reviewerName': 'Lucas Gordon', 'reviewerEmail': 'lucas.gordon@x.dummyjs

In [151]:
#b
def create_product_mapping(api_products):
    """
    Creates a mapping of product IDs to product info

    Parameters:
    - api_products: list of products from fetch_all_products()

    Returns:
    - dictionary mapping product IDs to product info
    """
    product_mapping = {}

    for product in api_products:
        product_id = product.get('id')

        product_mapping[product_id] = {
            'title': product.get('title'),
            'category': product.get('category'),
            'brand': product.get('brand'),
            'rating': product.get('rating')
        }

    return product_mapping


In [152]:
api_products = fetch_all_products()
product_map = create_product_mapping(api_products)

print(product_map[1])  


Successfully fetched 100 products.
{'title': 'Essence Mascara Lash Princess', 'category': 'beauty', 'brand': 'Essence', 'rating': 2.56}


In [153]:
#3.2
def enrich_sales_data(transactions, product_mapping):
    """
    Enriches transaction data with API product information

    Returns: list of enriched transaction dictionaries
    """
    enriched_transactions = []

    for t in transactions:
        enriched_txn = t.copy()  

        # Extract numeric product ID (P101 to 101)
        try:
            numeric_id = int(''.join(filter(str.isdigit, t['ProductID'])))
        except ValueError:
            numeric_id = None

        if numeric_id and numeric_id in product_mapping:
            api_info = product_mapping[numeric_id]

            enriched_txn['API_Category'] = api_info.get('category')
            enriched_txn['API_Brand'] = api_info.get('brand')
            enriched_txn['API_Rating'] = api_info.get('rating')
            enriched_txn['API_Match'] = True
        else:
            enriched_txn['API_Category'] = None
            enriched_txn['API_Brand'] = None
            enriched_txn['API_Rating'] = None
            enriched_txn['API_Match'] = False

        enriched_transactions.append(enriched_txn)

    return enriched_transactions


In [154]:
def save_enriched_data(enriched_transactions, filename='data/enriched_sales_data.txt'):
    """
    Saves enriched transactions back to a pipe-delimited file
    """
    header = [
        'TransactionID', 'Date', 'ProductID', 'ProductName',
        'Quantity', 'UnitPrice', 'CustomerID', 'Region',
        'API_Category', 'API_Brand', 'API_Rating', 'API_Match'
    ]

    try:
        with open(filename, 'w', encoding='utf-8') as file:
            file.write('|'.join(header) + '\n')

            for t in enriched_transactions:
                row = [
                    str(t.get('TransactionID', '')),
                    str(t.get('Date', '')),
                    str(t.get('ProductID', '')),
                    str(t.get('ProductName', '')),
                    str(t.get('Quantity', '')),
                    str(t.get('UnitPrice', '')),
                    str(t.get('CustomerID', '')),
                    str(t.get('Region', '')),
                    str(t.get('API_Category', '')),
                    str(t.get('API_Brand', '')),
                    str(t.get('API_Rating', '')),
                    str(t.get('API_Match', ''))
                ]

                file.write('|'.join(row) + '\n')

        print(f"Enriched sales data saved successfully to '{filename}'.")

    except Exception as e:
        print(f"Error saving enriched data: {e}")


In [155]:
api_products = fetch_all_products()
product_mapping = create_product_mapping(api_products)
enriched_data = enrich_sales_data(valid_txns, product_mapping)
save_enriched_data(enriched_data)


Successfully fetched 100 products.
Error saving enriched data: [Errno 2] No such file or directory: 'data/enriched_sales_data.txt'


In [156]:
#Q5 part 4

In [157]:
from datetime import datetime

def generate_sales_report(transactions, enriched_transactions, output_file='output/sales_report.txt'):
    """
    Generates a comprehensive formatted text report
    """

    # -------------------------
    # BASIC METRICS
    # -------------------------
    total_transactions = len(transactions)
    total_revenue = sum(t['Quantity'] * t['UnitPrice'] for t in transactions)
    avg_order_value = total_revenue / total_transactions if total_transactions else 0

    dates = sorted(t['Date'] for t in transactions)
    start_date, end_date = dates[0], dates[-1]

    # -------------------------
    # REGION-WISE PERFORMANCE
    # -------------------------
    region_stats = {}
    for t in transactions:
        region = t['Region']
        amount = t['Quantity'] * t['UnitPrice']

        if region not in region_stats:
            region_stats[region] = {'sales': 0, 'count': 0}

        region_stats[region]['sales'] += amount
        region_stats[region]['count'] += 1

    for r in region_stats:
        region_stats[r]['percentage'] = (region_stats[r]['sales'] / total_revenue) * 100

    region_stats = dict(sorted(
        region_stats.items(),
        key=lambda x: x[1]['sales'],
        reverse=True
    ))

    # -------------------------
    # TOP 5 PRODUCTS
    # -------------------------
    product_stats = {}
    for t in transactions:
        product = t['ProductName']
        amount = t['Quantity'] * t['UnitPrice']

        if product not in product_stats:
            product_stats[product] = {'qty': 0, 'revenue': 0}

        product_stats[product]['qty'] += t['Quantity']
        product_stats[product]['revenue'] += amount

    top_products = sorted(
        product_stats.items(),
        key=lambda x: x[1]['qty'],
        reverse=True
    )[:5]

    # -------------------------
    # TOP 5 CUSTOMERS
    # -------------------------
    customer_stats = {}
    for t in transactions:
        cid = t['CustomerID']
        amount = t['Quantity'] * t['UnitPrice']

        if cid not in customer_stats:
            customer_stats[cid] = {'spent': 0, 'orders': 0}

        customer_stats[cid]['spent'] += amount
        customer_stats[cid]['orders'] += 1

    top_customers = sorted(
        customer_stats.items(),
        key=lambda x: x[1]['spent'],
        reverse=True
    )[:5]

    # -------------------------
    # DAILY SALES TREND
    # -------------------------
    daily_stats = {}
    for t in transactions:
        date = t['Date']
        amount = t['Quantity'] * t['UnitPrice']

        if date not in daily_stats:
            daily_stats[date] = {'revenue': 0, 'count': 0, 'customers': set()}

        daily_stats[date]['revenue'] += amount
        daily_stats[date]['count'] += 1
        daily_stats[date]['customers'].add(t['CustomerID'])

    # -------------------------
    # PRODUCT PERFORMANCE
    # -------------------------
    best_day = max(
        daily_stats.items(),
        key=lambda x: x[1]['revenue']
    )

    low_products = [
        (p, d['qty'], d['revenue'])
        for p, d in product_stats.items()
        if d['qty'] < 10
    ]

    avg_value_region = {
        r: region_stats[r]['sales'] / region_stats[r]['count']
        for r in region_stats
    }

    # -------------------------
    # API ENRICHMENT SUMMARY
    # -------------------------
    total_enriched = len(enriched_transactions)
    success_count = sum(1 for t in enriched_transactions if t['API_Match'])
    failed_products = list({
        t['ProductName'] for t in enriched_transactions if not t['API_Match']
    })

    success_rate = (success_count / total_enriched) * 100 if total_enriched else 0

    # -------------------------
    # WRITE REPORT
    # -------------------------
    with open(output_file, 'w', encoding='utf-8') as file:

        file.write("="*44 + "\n")
        file.write("           SALES ANALYTICS REPORT\n")
        file.write(f"     Generated: {datetime.now()}\n")
        file.write(f"     Records Processed: {total_transactions}\n")
        file.write("="*44 + "\n\n")

        file.write("OVERALL SUMMARY\n")
        file.write("-"*44 + "\n")
        file.write(f"Total Revenue:        ₹{total_revenue:,.2f}\n")
        file.write(f"Total Transactions:   {total_transactions}\n")
        file.write(f"Average Order Value:  ₹{avg_order_value:,.2f}\n")
        file.write(f"Date Range:           {start_date} to {end_date}\n\n")

        file.write("REGION-WISE PERFORMANCE\n")
        file.write("-"*44 + "\n")
        file.write("Region    Sales         % Total   Transactions\n")
        for r, d in region_stats.items():
            file.write(f"{r:<8} ₹{d['sales']:>10,.0f}   {d['percentage']:>6.2f}%     {d['count']}\n")
        file.write("\n")

        file.write("TOP 5 PRODUCTS\n")
        file.write("-"*44 + "\n")
        for i, (p, d) in enumerate(top_products, 1):
            file.write(f"{i}. {p} | Qty: {d['qty']} | Revenue: ₹{d['revenue']:,.0f}\n")
        file.write("\n")

        file.write("TOP 5 CUSTOMERS\n")
        file.write("-"*44 + "\n")
        for i, (c, d) in enumerate(top_customers, 1):
            file.write(f"{i}. {c} | Spent: ₹{d['spent']:,.0f} | Orders: {d['orders']}\n")
        file.write("\n")

        file.write("DAILY SALES TREND\n")
        file.write("-"*44 + "\n")
        for d, v in sorted(daily_stats.items()):
            file.write(f"{d} | ₹{v['revenue']:,.0f} | {v['count']} | {len(v['customers'])}\n")
        file.write("\n")

        file.write("PRODUCT PERFORMANCE ANALYSIS\n")
        file.write("-"*44 + "\n")
        file.write(f"Best Selling Day: {best_day[0]} (₹{best_day[1]['revenue']:,.0f})\n")
        file.write("Low Performing Products:\n")
        for p in low_products:
            file.write(f"- {p[0]} (Qty: {p[1]})\n")
        file.write("\n")

        file.write("API ENRICHMENT SUMMARY\n")
        file.write("-"*44 + "\n")
        file.write(f"Total Products Enriched: {total_enriched}\n")
        file.write(f"Success Rate: {success_rate:.2f}%\n")
        file.write("Failed Products:\n")
        for p in failed_products:
            file.write(f"- {p}\n")

    print(f"Sales report generated successfully at '{output_file}'.")


In [158]:
generate_sales_report(
    valid_txns,
    enriched_data,
    output_file=r"C:\Users\Sneha\Downloads\output\sales_report.txt"
)


Sales report generated successfully at 'C:\Users\Sneha\Downloads\output\sales_report.txt'.


In [159]:
# 5.1
# IMPORTS
# ============================

import os
import requests
from datetime import datetime


def read_sales_data(filename):
    encodings = ['utf-8', 'latin-1', 'cp1252']
    for enc in encodings:
        try:
            with open(filename, 'r', encoding=enc) as f:
                lines = f.readlines()

            cleaned = []
            for line in lines:
                line = line.strip()
                if not line or line.startswith("TransactionID"):
                    continue
                cleaned.append(line)
            return cleaned

        except UnicodeDecodeError:
            continue
        except FileNotFoundError:
            print("File not found.")
            return []

    print("Unable to read file with supported encodings.")
    return []


def parse_transactions(raw_lines):
    transactions = []

    for line in raw_lines:
        parts = line.split("|")
        if len(parts) != 8:
            continue

        tid, date, pid, pname, qty, price, cid, region = parts
        pname = pname.replace(",", "")

        try:
            qty = int(qty.replace(",", ""))
            price = float(price.replace(",", ""))
        except ValueError:
            continue

        transactions.append({
            'TransactionID': tid,
            'Date': date,
            'ProductID': pid,
            'ProductName': pname,
            'Quantity': qty,
            'UnitPrice': price,
            'CustomerID': cid,
            'Region': region
        })

    return transactions


def validate_and_filter(transactions, region=None, min_amount=None, max_amount=None):
    valid = []
    invalid = 0

    for t in transactions:
        if (
            t['Quantity'] <= 0 or
            t['UnitPrice'] <= 0 or
            not t['TransactionID'].startswith("T") or
            not t['ProductID'].startswith("P") or
            not t['CustomerID'].startswith("C") or
            not t['Region']
        ):
            invalid += 1
            continue

        valid.append(t)

    filtered = []
    for t in valid:
        amount = t['Quantity'] * t['UnitPrice']

        if region and t['Region'] != region:
            continue
        if min_amount and amount < min_amount:
            continue
        if max_amount and amount > max_amount:
            continue

        filtered.append(t)

    summary = {
        'total_input': len(transactions),
        'invalid': invalid,
        'final_count': len(filtered)
    }

    return filtered, invalid, summary


# ============================
# Q3 – DATA ANALYSIS
# ============================

def calculate_total_revenue(transactions):
    return sum(t['Quantity'] * t['UnitPrice'] for t in transactions)


def region_wise_sales(transactions):
    stats = {}
    total = calculate_total_revenue(transactions)

    for t in transactions:
        r = t['Region']
        amt = t['Quantity'] * t['UnitPrice']
        stats.setdefault(r, {'total_sales': 0, 'count': 0})
        stats[r]['total_sales'] += amt
        stats[r]['count'] += 1

    for r in stats:
        stats[r]['percentage'] = round((stats[r]['total_sales'] / total) * 100, 2)

    return dict(sorted(stats.items(), key=lambda x: x[1]['total_sales'], reverse=True))


def top_selling_products(transactions, n=5):
    products = {}

    for t in transactions:
        p = t['ProductName']
        products.setdefault(p, {'qty': 0, 'rev': 0})
        products[p]['qty'] += t['Quantity']
        products[p]['rev'] += t['Quantity'] * t['UnitPrice']

    sorted_products = sorted(products.items(), key=lambda x: x[1]['qty'], reverse=True)
    return [(p, d['qty'], d['rev']) for p, d in sorted_products[:n]]


def customer_analysis(transactions):
    customers = {}

    for t in transactions:
        c = t['CustomerID']
        amt = t['Quantity'] * t['UnitPrice']
        customers.setdefault(c, {'spent': 0, 'orders': 0, 'products': set()})
        customers[c]['spent'] += amt
        customers[c]['orders'] += 1
        customers[c]['products'].add(t['ProductName'])

    result = {}
    for c, d in customers.items():
        result[c] = {
            'total_spent': d['spent'],
            'purchase_count': d['orders'],
            'avg_order_value': round(d['spent'] / d['orders'], 2),
            'products_bought': list(d['products'])
        }

    return dict(sorted(result.items(), key=lambda x: x[1]['total_spent'], reverse=True))


def daily_sales_trend(transactions):
    daily = {}

    for t in transactions:
        d = t['Date']
        daily.setdefault(d, {'revenue': 0, 'count': 0, 'customers': set()})
        daily[d]['revenue'] += t['Quantity'] * t['UnitPrice']
        daily[d]['count'] += 1
        daily[d]['customers'].add(t['CustomerID'])

    return {
        d: {
            'revenue': v['revenue'],
            'transaction_count': v['count'],
            'unique_customers': len(v['customers'])
        }
        for d, v in sorted(daily.items())
    }


def find_peak_sales_day(transactions):
    daily = daily_sales_trend(transactions)
    peak = max(daily.items(), key=lambda x: x[1]['revenue'])
    return peak[0], peak[1]['revenue'], peak[1]['transaction_count']


def low_performing_products(transactions, threshold=10):
    products = {}

    for t in transactions:
        p = t['ProductName']
        products.setdefault(p, {'qty': 0, 'rev': 0})
        products[p]['qty'] += t['Quantity']
        products[p]['rev'] += t['Quantity'] * t['UnitPrice']

    return sorted(
        [(p, d['qty'], d['rev']) for p, d in products.items() if d['qty'] < threshold],
        key=lambda x: x[1]
    )


# ============================
# Q4 – API INTEGRATION
# ============================

def fetch_all_products():
    try:
        r = requests.get("https://dummyjson.com/products?limit=100", timeout=10)
        r.raise_for_status()
        print("API products fetched")
        return r.json().get("products", [])
    except:
        print("API fetch failed")
        return []


def create_product_mapping(api_products):
    return {
        p['id']: {
            'category': p.get('category'),
            'brand': p.get('brand'),
            'rating': p.get('rating')
        }
        for p in api_products
    }


def enrich_sales_data(transactions, product_mapping):
    enriched = []

    for t in transactions:
        et = t.copy()
        try:
            pid = int(''.join(filter(str.isdigit, t['ProductID'])))
        except:
            pid = None

        if pid in product_mapping:
            et.update({
                'API_Category': product_mapping[pid]['category'],
                'API_Brand': product_mapping[pid]['brand'],
                'API_Rating': product_mapping[pid]['rating'],
                'API_Match': True
            })
        else:
            et.update({
                'API_Category': None,
                'API_Brand': None,
                'API_Rating': None,
                'API_Match': False
            })

        enriched.append(et)

    return enriched


def save_enriched_data(enriched, filename):
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    with open(filename, 'w', encoding='utf-8') as f:
        header = enriched[0].keys()
        f.write("|".join(header) + "\n")

        for t in enriched:
            f.write("|".join(str(t[h]) for h in header) + "\n")


# ============================
# Q5 – REPORT GENERATION
# ============================

def generate_sales_report(transactions, enriched, output_file):
    os.makedirs(os.path.dirname(output_file), exist_ok=True)

    with open(output_file, 'w', encoding='utf-8') as f:
        f.write("SALES ANALYTICS REPORT\n")
        f.write(f"Generated: {datetime.now()}\n")
        f.write(f"Records: {len(transactions)}\n\n")

        revenue = calculate_total_revenue(transactions)
        f.write(f"Total Revenue: ₹{revenue:,.2f}\n")

    print("Report generated")


# ============================
# Q6 – MAIN APPLICATION
# ============================

def main():
    try:
        print("SALES ANALYTICS SYSTEM")

        raw = read_sales_data(r"C:\Users\Sneha\Downloads\sales_data.txt")
        parsed = parse_transactions(raw)
        valid, invalid, _ = validate_and_filter(parsed)

        api_products = fetch_all_products()
        mapping = create_product_mapping(api_products)
        enriched = enrich_sales_data(valid, mapping)

        save_enriched_data(enriched, r"C:\Users\Sneha\Downloads\data\enriched_sales_data.txt")
        generate_sales_report(valid, enriched, r"C:\Users\Sneha\Downloads\output\sales_report.txt")

        print("PROCESS COMPLETED SUCCESSFULLY")

    except Exception as e:
        print("ERROR:", e)


if __name__ == "__main__":
    main()


SALES ANALYTICS SYSTEM
API products fetched
Report generated
PROCESS COMPLETED SUCCESSFULLY
