# Event Stream Processing with SQLite

In this exercise, you will process a stream of events and save the results into an SQLite database. This task will test your ability to handle real-time data, manage database interactions, and write clean, modular Python code.

## The Event Stream

You will simulate a stream of events where each event is a JSON object containing the following fields:
- `event_id`: Unique identifier for the event
- `event_type`: Type of the event (e.g., 'purchase', 'click')
- `event_timestamp`: Timestamp when the event occurred
- `user_id`: Identifier of the user who triggered the event
- `details`: A dictionary containing additional details relevant to the event type

## SQLite Database

You will store the processed events in an SQLite database. The database will have a single table named `events`, with the following schema:
- `event_id` (TEXT PRIMARY KEY)
- `event_type` (TEXT)
- `event_timestamp` (TEXT)
- `user_id` (TEXT)
- `details` (TEXT)
- `processed_at` (TEXT)  # New field to store the time when the event is processed

## Objectives

1. Write a Python function to simulate a stream of events.
2. Modify each event to add a new field `processed_at` before storing it in the database.
3. Process each event and save it to the SQLite database.
4. Implement error handling and logging.
5. Query the database to retrieve and display the stored events.

In [None]:
import sqlite3
import json
import logging
from datetime import datetime
import random
import time

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Create an SQLite database
conn = sqlite3.connect('events.db')
c = conn.cursor()

# Create the events table
c.execute('''
    CREATE TABLE IF NOT EXISTS events (
        event_id TEXT PRIMARY KEY,
        event_type TEXT,
        event_timestamp TEXT,
        user_id TEXT,
        details TEXT,
        processed_at TEXT
    )
''')

conn.commit()
logging.info('Database and table created successfully.')


## Step 1: Simulate Event Stream

Write a Python function to simulate a stream of events. Each event should be a JSON object containing the fields described above. The function should generate events at random intervals and yield them one by one.

In [None]:
def simulate_event_stream(num_events=10):
    event_types = ['purchase', 'click', 'signup']
    for _ in range(num_events):
        event = {
            'event_id': str(random.randint(100000, 999999)),
            'event_type': random.choice(event_types),
            'event_timestamp': datetime.now().isoformat(),
            'user_id': str(random.randint(1000, 9999)),
            'details': json.dumps({'amount': random.uniform(10.0, 100.0) if random.choice(event_types) == 'purchase' else None})
        }
        yield event
        time.sleep(random.uniform(0.1, 0.5))  # Simulate event arrival time

# Example usage
event_generator = simulate_event_stream(10)
for event in event_generator:
    print(event)


## Step 2: Modify the Events

Write a Python function to modify each event by adding a `processed_at` field. This field should store the current timestamp when the event is being processed.

In [None]:
def modify_event(event):
    event['processed_at'] = datetime.now().isoformat()
    return event

# Example usage
event = {
    'event_id': '123456',
    'event_type': 'purchase',
    'event_timestamp': '2024-08-09T12:34:56.789012',
    'user_id': '9876',
    'details': json.dumps({'amount': 50.0})
}
modified_event = modify_event(event)
print(modified_event)


## Step 3: Process and Store Events

Write a function to process each modified event and store it in the SQLite database. The function should handle any exceptions and log any errors that occur during processing.

In [None]:
def process_and_store_event(event, db_connection):
    try:
        cursor = db_connection.cursor()
        cursor.execute('''
            INSERT INTO events (event_id, event_type, event_timestamp, user_id, details, processed_at)
            VALUES (?, ?, ?, ?, ?, ?)
        ''', (
            event['event_id'],
            event['event_type'],
            event['event_timestamp'],
            event['user_id'],
            event['details'],
            event['processed_at']
        ))
        db_connection.commit()
        logging.info(f"Event {event['event_id']} stored successfully.")
    except sqlite3.Error as e:
        logging.error(f"Error storing event {event['event_id']}: {e}")

# Example usage
for event in simulate_event_stream(10):
    modified_event = modify_event(event)
    process_and_store_event(modified_event, conn)


## Step 4: Query the Database

Write a function to query the database and retrieve the stored events. Display the results in a formatted way.

In [None]:
def query_events(db_connection):
    cursor = db_connection.cursor()
    cursor.execute('SELECT * FROM events')
    rows = cursor.fetchall()
    for row in rows:
        print(row)

# Example usage
query_events(conn)


## Step 5: Clean Up

Close the database connection and perform any necessary cleanup operations.

In [None]:
# Close the database connection
conn.close()
logging.info('Database connection closed.')
