In [0]:
%pip install faker

Python interpreter will be restarted.
Python interpreter will be restarted.


Importing necessary libraries

In [0]:
import pandas as pd
import numpy as np
from faker import Faker
import uuid
import random
from datetime import datetime, timedelta



Generate Products data

In [0]:
fake = Faker('en_US')
Faker.seed(12345)

def generate_products(n):
    categories = ['Electronics', 'Clothing', 'Home & Garden', 'Books', 'Toys', 'Sports', 'Cosmetics', 'Invalid', 'Automotive', 'Jewelry', 'Food & Beverages', 'Pet Supplies', 'Office Supplies', 'Health & Wellness', 'Music', 'Movies', 'Furniture', 'Outdoor & Camping', 'Art & Crafts', 'Baby Products']
    products_df = pd.DataFrame({
        'product_id': [uuid.uuid4() for _ in range(n)],
        'sku': [fake.unique.ean13() for _ in range(n)],
        'product_name': [fake.catch_phrase() for _ in range(n)],
        'description': [fake.paragraph() for _ in range(n)],
        'category': [fake.random_element(elements=categories) for _ in range(n)],
        'brand': [fake.company() for _ in range(n)],
        'price': [round(fake.random.uniform(5, 500), 2) for _ in range(n)],
        'weight': [round(fake.random.uniform(0.1, 20), 2) for _ in range(n)],
        'in_stock': [fake.random.randint(0, 1000) for _ in range(n)],
        'is_active': [fake.boolean(chance_of_getting_true=90) for _ in range(n)],
        'created_at': [fake.date_time_this_year() for _ in range(n)]
    })

    products_df.loc[products_df.sample(frac=0.05).index, 'category'] = 'Invalid'  # 5% of categories are invalid
    return products_df

Generate Customers data

In [0]:
def generate_customers(n):
    customers_df = pd.DataFrame({
        'customer_id': [uuid.uuid4() for _ in range(n)],
        'first_name': [fake.first_name() for _ in range(n)],
        'last_name': [fake.last_name() for _ in range(n)],
        'email': [fake.email() for _ in range(n)],
        'phone_number': [fake.phone_number() for _ in range(n)],
        'address': [fake.street_address() for _ in range(n)],
        'city': [fake.city() for _ in range(n)],
        'state': [fake.state() for _ in range(n)],
        'zip_code': [fake.zipcode() for _ in range(n)],
        'country': ['United States' for _ in range(n)],
        'date_of_birth': [fake.date_of_birth(minimum_age=18, maximum_age=90) for _ in range(n)],
        'gender': [fake.random_element(elements=('M', 'F', 'Other')) for _ in range(n)],
        'registration_date': [fake.date_time_this_decade() for _ in range(n)],
        'last_login': [fake.date_time_this_month() for _ in range(n)]
    })
    customers_df.loc[customers_df.sample(frac=0.1).index, 'email'] = np.nan  # 10% of emails are missing
    customers_df.loc[customers_df.sample(frac=0.05).index, 'zip_code'] = 'INVALID'  # 5% of zip codes are invalid
    return customers_df

Generate Promotions Data

In [0]:
def generate_promotions(n):
    start_dates = [fake.date_time_this_year() for _ in range(n)]
    promotions_df = pd.DataFrame({
        'promotion_id': [uuid.uuid4() for _ in range(n)],
        'name': [fake.catch_phrase() for _ in range(n)],
        'description': [fake.paragraph() for _ in range(n)],
        'discount_type': [fake.random_element(elements=('Percentage', 'Fixed Amount')) for _ in range(n)],
        'discount_value': [round(fake.random.uniform(5, 50), 2) for _ in range(n)],
        'start_date': start_dates,
        'end_date': [fake.date_between(start_date=start_date, end_date=start_date + timedelta(days=30)) for start_date in start_dates]
    })
    promotions_df.loc[promotions_df.sample(frac=0.1).index, 'discount_value'] = np.nan  # 10% of discount values are missing

    return promotions_df

Generate Orders Data

In [0]:
def generate_orders(customers, promotions, n):
    payment_methods = ['Credit Card', 'Debit Card', 'PayPal', 'Bank Transfer', 'Cash on Delivery','Invalid']
    orders_df = pd.DataFrame({
        'order_id': [uuid.uuid4() for _ in range(n)],
        'customer_id': [fake.random.choice(customers['customer_id']) for _ in range(n)],
        'order_date': [fake.date_time_this_year() for _ in range(n)],
        'status': [fake.random_element(elements=('Pending', 'Processing', 'Shipped', 'Delivered', 'Cancelled')) for _ in range(n)],
        'total_amount': [round(fake.random.uniform(10, 1000), 2) for _ in range(n)],
        'promotion_id': [fake.random.choice(promotions['promotion_id']) for _ in range(n)],
        'payment_method': [fake.random_element(elements=payment_methods) for _ in range(n)],
        'shipping_cost': [round(fake.random.uniform(5, 50), 2) for _ in range(n)]
    })
    orders_df.loc[orders_df.sample(frac=0.1).index, 'total_amount'] = np.nan  # 10% of total amounts are missing
    orders_df.loc[orders_df.sample(frac=0.05).index, 'payment_method'] = 'Invalid'  # 5% of payment methods are invalid
    return orders_df

In [0]:
# Generate data and convert UUID columns to String datatype

# Products - 10000 unique products
products_df = generate_products(n=10000)
products_df['product_id'] = products_df['product_id'].astype(str)

# Customers - 1000 unique customers
customers_df = generate_customers(n=1000)
customers_df['customer_id'] = customers_df['customer_id'].astype(str)

# Promotions - 50
promotions_df = generate_promotions(n=50)
promotions_df['promotion_id'] = promotions_df['promotion_id'].astype(str)

# Orders - 30000
orders_df = generate_orders(customers_df, promotions_df, n=30000)
orders_df['order_id'] = orders_df['order_id'].astype(str)
orders_df['customer_id'] = orders_df['customer_id'].astype(str)
orders_df['promotion_id'] = orders_df['promotion_id'].astype(str)

In [0]:
# Populate shipping information in orders from the randomly selected customer
for index, row in orders_df.iterrows():
    customer = customers_df[customers_df['customer_id'] == row['customer_id']].iloc[0]
    orders_df.at[index, 'shipping_address'] = customer['address']
    orders_df.at[index, 'shipping_city'] = customer['city']
    orders_df.at[index, 'shipping_state'] = customer['state']
    orders_df.at[index, 'shipping_zip'] = customer['zip_code']
    orders_df.at[index, 'shipping_country'] = customer['country']

In [0]:
# Randomly select a subset of orders (e.g., 20% of the DataFrame)
subset = orders_df.sample(frac=0.2, random_state=42)

# Update the address information for the selected subset
subset['shipping_address'] = subset.apply(lambda _: fake.street_address(), axis=1)
subset['shipping_city'] = subset.apply(lambda _: fake.city(), axis=1)
subset['shipping_state'] = subset.apply(lambda _: fake.state(), axis=1)
subset['shipping_zip'] = subset.apply(lambda _: fake.zipcode(), axis=1)
subset['shipping_country'] = subset.apply(lambda _: 'United States', axis=1)

# Update the original DataFrame with the modified subset
orders_df.update(subset)

Generate Order Line Items Data

In [0]:
def generate_order_line_items(orders, products, n):
    df = pd.DataFrame({
        'line_item_id': [uuid.uuid4() for _ in range(n)],
        'order_id': [fake.random.choice(orders['order_id']) for _ in range(n)],
        'product_id': [fake.random.choice(products['product_id']) for _ in range(n)],
        'quantity': [fake.random.randint(1, 10) for _ in range(n)],
        'unit_price': [0 for _ in range(n)],
        'subtotal': [0 for _ in range(n)]
    })

    return df

In [0]:
# Generate order line items data

order_line_items_df = generate_order_line_items(orders_df, products_df, 60000)

for index, row in order_line_items_df.iterrows():
        product = products_df[products_df['product_id'] == row['product_id']].iloc[0]
        order_line_items_df.at[index, 'unit_price'] = product['price']

order_line_items_df['subtotal'] = order_line_items_df['quantity'] * order_line_items_df['unit_price']
order_line_items_df.loc[order_line_items_df.sample(frac=0.1).index, 'quantity'] = np.nan  # 10% of quantities are missing
order_line_items_df.loc[order_line_items_df.sample(frac=0.1).index, 'unit_price'] = np.nan  # 10% of unit prices are missing


order_line_items_df['line_item_id'] = order_line_items_df['line_item_id'].astype(str)
order_line_items_df['order_id'] = order_line_items_df['order_id'].astype(str)
order_line_items_df['product_id'] = order_line_items_df['product_id'].astype(str)

In [0]:
# Read file from ADLS
from pyspark.sql import SparkSession

# Configure the storage account access key
storage_account_name = "<azure_storage_account_name>"
storage_account_key = "<azure_storage_account_key>"
container_name = "<azure_container_name>"

# Set up the Spark configuration
spark = SparkSession.builder.getOrCreate()
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_account_key)

# Define the ADLS Gen2 path
adls_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/landing/"


products = spark.createDataFrame(products_df)
customers = spark.createDataFrame(customers_df)
orders = spark.createDataFrame(orders_df)
orderLineItems = spark.createDataFrame(order_line_items_df)
promotions = spark.createDataFrame(promotions_df)

In [0]:
products.coalesce(1).write.format('csv').mode('append').option('header', 'true').save(adls_path + 'products')
customers.coalesce(1).write.format('csv').mode('append').option('header', 'true').save(adls_path + 'customers')
orders.coalesce(1).write.format('csv').mode('append').option('header', 'true').save(adls_path + 'orders')
orderLineItems.coalesce(1).write.format('csv').mode('append').option('header', 'true').save(adls_path + 'orderLineItems')
promotions.coalesce(1).write.format('csv').mode('append').option('header', 'true').save(adls_path + 'promotions')


Script to download CSV files using a link

In [0]:

# import base64
# from IPython.display import HTML

# spark = SparkSession.builder.getOrCreate()
# # df = spark.read.csv("products.csv", header=True, inferSchema=True)
# # pandas_df = df.toPandas()
# csv_data = order_line_items_df.to_csv(index=False)
# b64 = base64.b64encode(csv_data.encode()).decode()
# href = f'<a href="data:text/csv;base64,{b64}" download="lineitems.csv">Click here to download file from DBFS</a>'
# HTML(href)

