# Digital Growth Performance - Generate Data for Streaming to Kinesis
**Streaming Field Name Descriptions** \
**event_time** - Timestamp of the record \
**platform** - Ad platform (Google Ads, TikTok, etc.) \
**campaign_id** - Unique campaign ID \
**event_type** - Type of event \
**session_id** - Unique user identifer \
**region** - User region \
**device_type** - Device used \
**product_type** - Product/service associated \
**conversions** - No. of sucessful outcomes

In [24]:
import boto3
import os
import json
from datetime import datetime
from faker import Faker
import random
from dotenv import load_dotenv

In [2]:
PLATFORM_PREFIX_MAP = {
    "Google Ads": "GA-",
    "Facebook Ads": "FB-",
    "TikTok Ads": "TT-",
    "YouTube Ads": "YT-"
}

In [None]:
fake = Faker()

# generate data for Kinesis Firehose stream
def get_data(platforms, i):
    campaign_id = f"{PLATFORM_PREFIX_MAP.get(platforms, None)}{i:04d}"
    
    return {
        "event_time": fake.iso8601(),
        "platform": platforms,
        "campaign_id": campaign_id,
        "event_type": random.choice(["sign_up", "click", "conversion"]),
        "session_id": fake.uuid4(),
        "region": random.choice(["US", "EU", "APAC"]),
        "device_type": random.choice(["Mobile", "Tablet", "Desktop"]),
        "product_type:": random.choice(["Subscription", "App", "Lead"])
    }

In [None]:
# generate stream and send streams to firehouse
def generate_mock_stream(num_rows_per_platform=40):
    STREAM_NAME = "performance-firehose-to-s3"
    REGION = 'us-east-1'
    platforms = ["Google Ads", "Facebook Ads", "TikTok Ads", "YouTube Ads"]
    
    load_dotenv("config.env")

    access_id=os.getenv("AWS_ACCESS_KEY")
    secret_key=os.getenv("AWS_SECRET_KEY")
    
    assert access_id is not None, "AWS_ACCESS_KEY is not set"
    assert secret_key is not None, "AWS_SECRET_KEY is not set"
   
    firehose_client = boto3.client(service_name="firehose", region_name=REGION, aws_access_key_id=access_id, aws_secret_access_key=secret_key)
    
    for platform in platforms:
        for i in range(1, num_rows_per_platform=40):
            data = get_data(platform, i)
            json_data = json.dumps(data) + '\n'
            
            # Send events to firehose
            firehose_client.put_record(
                DeliveryStreamName=STREAM_NAME,
                Record={'Data': json_data},
            )

In [None]:
json_file = f"events_{datetime.now().strftime('%Y%m%d')}.json"
data=generate_mock_stream(num_rows_per_platform=40)

with open(json_file, "w") as file:
    json.dump(data, file, indent=4)
    
print(f"JSON file '{json_file}' created with {len(data)} rows.")