In [0]:
import random
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, BooleanType, TimestampType, DateType

# Start Spark session
spark = SparkSession.builder.appName("Data Generation").getOrCreate()

def random_date(start, end):
    """Generate a random datetime between `start` and `end`."""
    return start + timedelta(
        seconds=random.randint(0, int((end - start).total_seconds())))

# Generate test data for the last 7-day period.
start_date = datetime.now() - timedelta(days=7)
end_date = datetime.now()

def make_items(base_item_id):
    """Generate a list of 10 items with random order flag."""
    items = []
    # Randomly decide how many items to mark as ordered (between 1 and 3)
    num_ordered_items = random.randint(1, 3)
    # Randomly choose indices to mark as ordered
    ordered_indices = random.sample(range(10), num_ordered_items)

    for i in range(10):
        items.append({
            "item_id": base_item_id + i,
            "is_order": (i in ordered_indices)  # randomly mark items as ordered
        })
    return items
  
def generate_impressions():
    impressions_schema = StructType([
        StructField("dt", StringType(), False),
        StructField("ranking_id", StringType(), False),
        StructField("customer_id", IntegerType(), False),
        StructField("impressions", ArrayType(
            StructType([
                StructField("item_id", IntegerType(), False),
                StructField("is_order", BooleanType(), False)
            ])
        ), False)
    ])
  
    impressions_data = [{"dt": (start_date + timedelta(days=i % 7)).strftime("%Y-%m-%d"),
                         "ranking_id": f"r{i}",
                         "customer_id": random.randint(1000, 1020),
                         "impressions": make_items(200 + i * 10)} for i in range(10)]

    impressions_df = spark.createDataFrame(impressions_data, schema=impressions_schema)
    return impressions_df
  
def generate_clicks():
    clicks_schema = StructType([
        StructField("dt", StringType(), False),
        StructField("customer_id", IntegerType(), False),
        StructField("item_id", IntegerType(), False),
        StructField("click_time", TimestampType(), False)
    ])

    clicks_data = [{"dt": (start_date + timedelta(days=i % 7)).strftime("%Y-%m-%d"),
                    "customer_id": random.randint(1000, 1020),
                    "item_id": random.randint(200, 210),
                    "click_time": random_date(start_date, end_date)} for i in range(10)]

    clicks_df = spark.createDataFrame(clicks_data, schema=clicks_schema)
    return clicks_df

def generate_add_to_cart():
    """Generate a DataFrame for sample 'add to cart' data."""
    add_to_cart_schema = StructType([
        StructField("dt", StringType(), nullable=False),
        StructField("customer_id", IntegerType(), nullable=False),
        StructField("config_id", IntegerType(), nullable=False),
        StructField("simple_id", IntegerType(), nullable=False),
        StructField("occurred_at", TimestampType(), nullable=False)
    ])

    add_to_cart_data = [
        {
            "dt": (start_date + timedelta(days=i % 7)).strftime("%Y-%m-%d"),
            "customer_id": random.randint(1000, 1020),
            "config_id": random.randint(200, 210),
            "simple_id": random.randint(1, 5),
            "occurred_at": random_date(start_date, end_date)
        }
        for i in range(10)
    ]

    add_to_cart_df = spark.createDataFrame(add_to_cart_data, schema=add_to_cart_schema)
    return add_to_cart_df
  
def generate_previous_orders():
    """Generate a DataFrame for sample previous orders data."""
    previous_orders_schema = StructType([
        StructField("order_date", DateType(), nullable=False),
        StructField("customer_id", IntegerType(), nullable=False),
        StructField("config_id", IntegerType(), nullable=False),
        StructField("simple_id", IntegerType(), nullable=False),
        StructField("occurred_at", TimestampType(), nullable=False)
    ])

    previous_orders_data = [
        {
            "order_date": (start_date + timedelta(days=i % 7)).date(),
            "customer_id": random.randint(1000, 1020),
            "config_id": random.randint(200, 210),
            "simple_id": random.randint(1, 5),
            "occurred_at": random_date(start_date, end_date)
        }
        for i in range(10)
    ]

    previous_orders_df = spark.createDataFrame(previous_orders_data, schema=previous_orders_schema)
    return previous_orders_df