# Final Project | Data Engineering Bootcamp | Enrique Arce

## 1. Generative AI Data

### Import Libraries and Faker

In [None]:
!pip install faker



In [None]:
import faker
import pandas as pd
import random
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from datetime import datetime

# Initializing faker
fake = faker.Faker()

#Creating Spark Sesion
spark = SparkSession.builder.appName("SalesGeneration").getOrCreate()

### 1. Products



In [None]:
# Settings of products' categories
categories = {"Electronics": 10, "Clothing": 8, "Home & Kitchen": 9, "Sports": 11, "Books": 12}
total_products = sum(categories.values())

In [None]:
categories = {
    "Electronics": 10,
    "Clothing": 8,
    "Home & Kitchen": 9,
    "Sports": 11,
    "Books": 12
}

price_ranges = {
    "Electronics": (500, 4500),
    "Clothing": (10, 300),
    "Home & Kitchen": (50, 1000),
    "Sports": (20, 1500),
    "Books": (5, 100)
}

brand_names = ["Samsung", "Nike", "IKEA", "Wilson", "Sony", "Apple", "Adidas", "Philips", "Reebok", "Canon"]
product_types = {
    "Electronics": ["Smartphone", "Wireless Headphones", "Laptop", "Tablet", "Smartwatch"],
    "Clothing": ["Running Shoes", "Sports Jacket", "Casual T-Shirt", "Denim Jeans", "Winter Coat"],
    "Home & Kitchen": ["Wooden Table", "Air Fryer", "Blender", "Coffee Maker", "Microwave Oven"],
    "Sports": ["Tennis Racket", "Football", "Basketball", "Training Shorts", "Gym Gloves"],
    "Books": ["Mystery Novel", "Self-Help Guide", "Science Fiction Book", "Biography", "Cooking Recipe Book"]
}

products = []
product_id_counter = 1

for category, count in categories.items():
    for _ in range(count):
        product_id = f"P-{product_id_counter:03d}"
        product_id_counter += 1
        brand = random.choice(brand_names)
        product_name = f"{brand} {random.choice(product_types[category])}"
        price = round(random.uniform(*price_ranges[category]), 1)

        products.append({
            "product_id": product_id,
            "name": product_name,
            "category": category,
            "price": price
        })

products_df = pd.DataFrame(products)
products_df.to_csv("products.csv", index=False)

print(products_df.head())

  product_id                       name     category   price
0      P-001  Apple Wireless Headphones  Electronics   513.5
1      P-002              Adidas Tablet  Electronics   774.5
2      P-003          Adidas Smartwatch  Electronics  2316.3
3      P-004               Canon Laptop  Electronics  4081.7
4      P-005               Canon Tablet  Electronics   752.5


### 2. Customers



In [None]:
num_customers = 13543


region_city_mapping = {
    "Santa Cruz": ["Santa Cruz de la Sierra", "Montero", "Warnes"],
    "La Paz": ["La Paz", "El Alto", "Achocalla"],
    "Cochabamba": ["Cochabamba", "Quillacollo", "Sacaba"]
}

region_weights = {"Santa Cruz": 0.37, "La Paz": 0.32, "Cochabamba": 0.31}

# IDs
customer_ids = set()
while len(customer_ids) < num_customers:
    customer_ids.add(fake.unique.random_number(digits=6, fix_len=True))
customer_ids = list(customer_ids)


customers = []
for customer_id in customer_ids:
    name = fake.name()
    first_name, last_name = name.split()[:2]  # Tomamos los primeros dos nombres
    email = f"{first_name.lower()}.{last_name.lower()}@{fake.free_email_domain()}"

    region = random.choices(list(region_weights.keys()), weights=region_weights.values(), k=1)[0]
    city = random.choice(region_city_mapping[region])

    customers.append({
        "customer_id": customer_id,
        "name": name,
        "email": email,
        "signup_date": fake.date_between(start_date="-5y", end_date="today"),
        "region": region,
        "city": city
    })


customers_df = pd.DataFrame(customers)
customers_df.to_csv("customers.csv", index=False)
print(customers_df.head())

   customer_id               name                          email signup_date  \
0       753666  Rebekah Carpenter  rebekah.carpenter@hotmail.com  2022-10-03   
1       884739        Kellie Khan          kellie.khan@yahoo.com  2023-04-02   
2       327684       Miguel Tapia       miguel.tapia@hotmail.com  2025-03-17   
3       327690        Mary Rivera          mary.rivera@gmail.com  2021-09-09   
4       262155     Robert Griffin       robert.griffin@yahoo.com  2020-09-26   

       region       city  
0      La Paz  Achocalla  
1      La Paz     La Paz  
2  Santa Cruz     Warnes  
3      La Paz  Achocalla  
4      La Paz     La Paz  


### 3. Sales

In [None]:

store_ids = set()
while len(store_ids) < 500:
    store_ids.add(fake.unique.random_number(digits=6, fix_len=True))
store_ids = list(store_ids)

# -------------------------------SALES----------------------------------
num_sales = 936297


region_weights = {"Santa Cruz": 0.37, "La Paz": 0.32, "Cochabamba": 0.31}
sales_per_region = {region: int(num_sales * weight) for region, weight in region_weights.items()}

popular_products = {region: products_df["product_id"].tolist() for region in region_weights}
all_products = products_df["product_id"].tolist()


stores_per_region = {region: random.sample(store_ids, k=int(500 * region_weights[region])) for region in region_weights}


sales = []

for region, num_sales_region in sales_per_region.items():
    region_customers = customers_df[customers_df["region"] == region][["customer_id"]]

    for _ in range(num_sales_region):
        sale_id = fake.unique.random_number(digits=9, fix_len=True)

        customer_id = region_customers.sample(1).iloc[0]["customer_id"]


        store_id = random.choice(stores_per_region[region])


        product_id = random.choice(popular_products[region]) if random.random() < 0.7 else random.choice(all_products)

        quantity = random.randint(1, 5)
        sale_date = fake.date_between(start_date="-5y", end_date="today")

        sales.append({
            "sale_id": sale_id,
            "store_id": store_id,
            "product_id": product_id,
            "customer_id": customer_id,
            "quantity": quantity,
            "sale_date": sale_date
        })

sales_df = pd.DataFrame(sales)
sales_df.to_csv("sales.csv", index=False)

print(sales_df.head())

     sale_id  store_id product_id  customer_id  quantity   sale_date
0  961677193    802779      P-027       726948         2  2020-06-08
1  323017237    941256      P-021       413831         4  2025-02-10
2  693795100    612044      P-041       907065         5  2022-07-13
3  500816712    675245      P-001       107894         2  2020-11-29
4  557407685    353247      P-032       491512         1  2020-05-01


### 4. Stores


In [None]:

region_city_mapping = {
    "Santa Cruz": ["Santa Cruz de la Sierra", "Montero", "Warnes"],
    "La Paz": ["La Paz", "El Alto", "Achocalla"],
    "Cochabamba": ["Cochabamba", "Quillacollo", "Sacaba"]
}


lead_time_mapping = {
    "Santa Cruz": random.randint(2, 5),
    "La Paz": random.randint(3, 7),
    "Cochabamba": random.randint(4, 6)
}

stores = []
used_store_ids = set()

for store_id in store_ids:
    if store_id in used_store_ids:
        continue

    region = random.choice(list(region_city_mapping.keys()))
    city = random.choice(region_city_mapping[region])

    stores.append({
        "store_id": store_id,
        "city": city,
        "region": region
    })

    used_store_ids.add(store_id)

stores_df = pd.DataFrame(stores)
stores_df.to_csv("stores.csv", index=False)


print(f"Total de tiendas únicas: {stores_df['store_id'].nunique()}")

print(stores_df.head())

Total de tiendas únicas: 500
   store_id         city      region
0    669698    Achocalla      La Paz
1    264194      Montero  Santa Cruz
2    710660  Quillacollo  Cochabamba
3    591883      Montero  Santa Cruz
4    380945      Montero  Santa Cruz


### 5. Stocks

In [None]:

low_stock_threshold = 10


product_sales_counts = sales_df["product_id"].value_counts().to_dict()


stocks = []
stock_id_counter = 1

for store_id in store_ids:
    for product_id in products_df["product_id"]:
        base_stock = min(product_sales_counts.get(product_id, 5), 200)
        stock_level = int(base_stock * random.uniform(1.2, 6.0))

        stock_level = max(120, min(600, stock_level))

        stocks.append({
            "stock_id": stock_id_counter,
            "store_id": store_id,
            "product_id": product_id,
            "stock_level": stock_level,
            "low_stock_alert": 1 if stock_level < low_stock_threshold else 0
        })

        stock_id_counter += 1


stocks_df = pd.DataFrame(stocks)
stocks_df.to_csv("stocks.csv", index=False)


print(f"Total de registros en Stocks: {stocks_df.shape[0]}")
print(f"Total de stock_id únicos: {stocks_df['stock_id'].nunique()}")
print(stocks_df.head())

Total de registros en Stocks: 25000
Total de stock_id únicos: 25000
   stock_id  store_id product_id  stock_level  low_stock_alert
0         1    669698      P-001          600                0
1         2    669698      P-002          600                0
2         3    669698      P-003          600                0
3         4    669698      P-004          600                0
4         5    669698      P-005          600                0


## 2. Streaming system to input data

### Instaling the GCP&S service

In [None]:
!pip install google-cloud-pubsub



### User authentication for COLAB in GCP environment

In [1]:
import os
from google.oauth2 import service_account
from google.cloud import pubsub_v1


key_path = "/content/inventoryoptimization-455112-946c0b30b97c.json"

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = key_path

credentials = service_account.Credentials.from_service_account_file(key_path)
print("Autenticado correctamente como:", credentials.service_account_email)


Autenticado correctamente como: colab-35@inventoryoptimization-455112.iam.gserviceaccount.com


### Envío de mensaje PUB/SUB

In [3]:
from google.cloud import pubsub_v1, bigquery
import json
from datetime import datetime
import random

# Settiings
project_id = "inventoryoptimization-455112"
topic_id = "sales-topic"
dataset_id = "optimization_Inventory"
sales_table = "fact_sales"
products_table = "dim_products"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
bq_client = bigquery.Client()


query = f"SELECT sale_id FROM `{project_id}.{dataset_id}.{sales_table}`"
existing_sales = {row.sale_id for row in bq_client.query(query)}

while True:
    sale_id = random.randint(100000, 999999)
    if sale_id not in existing_sales:
        break

product_id = "P-002"
quantity = 100

query_price = f"""
SELECT price
FROM `{project_id}.{dataset_id}.{products_table}`
WHERE product_id = '{product_id}'
"""
price_result = bq_client.query(query_price).result()
price = next(iter(price_result), None)

if price:
    total_sale_value = quantity * price.price
else:
    raise ValueError(f"❌ No se encontró precio para el producto {product_id}")


sale_event = {
    "sale_id": sale_id,
    "store_id": 931781,
    "product_id": product_id,
    "customer_id": 350611,
    "quantity": quantity,
    "sale_date": "2025-03-31",
    "total_sale_value": total_sale_value
}


message_data = json.dumps(sale_event).encode("utf-8")
future = publisher.publish(topic_path, message_data)
message_id = future.result()

print(f"✅ Mensaje enviado con ID: {message_id}")
print(f"📤 Datos enviados:\n{json.dumps(sale_event, indent=2)}")

✅ Mensaje enviado con ID: 14042639305058040
📤 Datos enviados:
{
  "sale_id": 844954,
  "store_id": 931781,
  "product_id": "P-002",
  "customer_id": 350611,
  "quantity": 100,
  "sale_date": "2025-03-31",
  "total_sale_value": 77450.0
}
