**Assignment**

In [1]:
pip install psycopg2

Note: you may need to restart the kernel to use updated packages.


In [None]:
import requests
import json
import psycopg2
from datetime import datetime, timedelta

# 1. Fetch Data
url = "https://storage.googleapis.com/xcc-de-assessment/events.json"
response = requests.get(url)
lines = response.text.splitlines()
data = [json.loads(line) for line in lines]
logged_in_data = [event for event in data if event["event"]["customer-id"] is not None]



In [None]:
# 2. Group data by customer id and sort by timestamp
customers_data = {}
for event in logged_in_data:
    customer_id = event["event"]["customer-id"]
    if customer_id not in customers_data:
        customers_data[customer_id] = []
    customers_data[customer_id].append(event)

for customer_id, events in customers_data.items():
    events.sort(key=lambda x: datetime.fromisoformat(x["event"]["timestamp"]))



In [None]:
# 3. Sessionize based on inactivity period
# I'm gonna assume based in the assignment reauirement that the time between each session is 30 minutes of inactivity 
session_cutoff_duration = timedelta(minutes=30)
sessionized_data = []

for customer_id, events in customers_data.items():
    current_session = []
    for i in range(len(events)):
        event = events[i]
        if i == 0:
            current_session.append(event)
        else:
            prev_timestamp = datetime.fromisoformat(events[i-1]["event"]["timestamp"])
            curr_timestamp = datetime.fromisoformat(event["event"]["timestamp"])
            
            if curr_timestamp - prev_timestamp > session_cutoff_duration:
                # Append each event in the current session to sessionized_data
                for session_event in current_session:
                    sessionized_data.append({
                        "customer-id": customer_id,
                        "session-start": current_session[0]["event"]["timestamp"],
                        "session-end": current_session[-1]["event"]["timestamp"],
                        "event-id": session_event["id"],
                        "event-type": session_event["type"],
                        "event-useragent": session_event["event"].get("user-agent"),
                        "event-ip": session_event["event"].get("ip"),
                        "event-customer-id": session_event["event"].get("customer-id"),
                        "event-timestamp": session_event["event"].get("timestamp"),
                        "event-page": session_event["event"].get("page")
                    })
                current_session = [event]
            else:
                current_session.append(event)

    # Do the same for the last session if any
    for session_event in current_session:
        sessionized_data.append({
            "customer-id": customer_id,
            "session-start": current_session[0]["event"]["timestamp"],
            "session-end": current_session[-1]["event"]["timestamp"],
            "event-id": session_event["id"],
            "event-type": session_event["type"],
            "event-useragent": session_event["event"].get("user-agent"),
            "event-ip": session_event["event"].get("ip"),
            "event-customer-id": session_event["event"].get("customer-id"),
            "event-timestamp": session_event["event"].get("timestamp"),
            "event-page": session_event["event"].get("page")
        })

In [None]:
# 4. Store in PostgreSQL database
# Constants for database connection
DB_NAME = "xcceleratedpostgresdb"
DB_USER = "postgres"
DB_PASSWORD = "AzerKimo2022@@@"  # In Prod, consider using environment variables or Databricks secrets for sensitive info
DB_HOST = "localhost"  # Replace with your actual host (not localhost if PostgreSQL is on Docker)
DB_PORT = "5432"
TABLE_NAME = "sessions"

# Create a connection
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, host=DB_HOST, port=DB_PORT)
c = conn.cursor()

c.execute('''
CREATE TABLE IF NOT EXISTS sessions (
    customer_id TEXT,
    session_start TIMESTAMP,
    session_end TIMESTAMP,
    event_id INTEGER,
    event_type TEXT,
    event_useragent TEXT,
    event_ip TEXT,
    event_customer_id INTEGER,
    event_timestamp TIMESTAMP,
    event_page TEXT
)
''')

for session in sessionized_data:
    for event in session["events"]:
        c.execute(f'''
        INSERT INTO {TABLE_NAME} (
            customer_id, session_start, session_end, 
            event_id, event_type, event_useragent, 
            event_ip, event_customer_id, event_timestamp, 
            event_page
        ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''',
                  (
                      session["customer-id"], 
                      session["session-start"], 
                      session["session-end"], 
                      event["id"],
                      event["type"],
                      event["event"].get("user-agent"),
                      event["event"].get("ip"),
                      event["event"].get("customer-id"),
                      event["event"].get("timestamp"),
                      event["event"].get("page")
                  ))

conn.commit()

In [None]:
#checking the number of rows
total_rows = len(sessionized_data)
print(total_rows)


**Step 2: Publish the Results**

1. Design the API
2. Implementation

In [None]:
pip install Flask psycopg2

In [None]:
import psycopg2

# Define connection parameters
DATABASE_SETTINGS = {
    "database": "xcceleratedpostgresdb",
    "user": "postgres",
    "password": "AzerKimo2022@@@",
    "host": "localhost",
    "port": "5432"
}

# Create a connection and cursor object
connection = psycopg2.connect(**DATABASE_SETTINGS)
cursor = connection.cursor()


In [None]:
# SQL to get median visits before order
median_visits_sql = """
WITH OrdersSessions AS (
    SELECT customer_id, MIN(session_timestamp) as first_order_time
    FROM sessions
    WHERE event_type = 'ADD_PRODUCT_TO_CART'  -- Assuming this indicates a purchase
    GROUP BY customer_id
)
SELECT 
    customer_id, 
    COUNT(session_id) as total_sessions_before_order
FROM sessions
JOIN OrdersSessions ON sessions.customer_id = OrdersSessions.customer_id
WHERE session_timestamp < first_order_time
GROUP BY sessions.customer_id
"""

# SQL to get median session duration before order
median_duration_sql = """
WITH OrdersSessions AS (
    SELECT customer_id, MIN(session_timestamp) as first_order_time
    FROM sessions
    WHERE event_type = 'ADD_PRODUCT_TO_CART'  -- Assuming this indicates a purchase
    GROUP BY customer_id
)
SELECT 
    customer_id, 
    AVG(session_end_time - session_start_time) as avg_session_duration
FROM sessions
JOIN OrdersSessions ON sessions.customer_id = OrdersSessions.customer_id
WHERE session_timestamp < first_order_time
GROUP BY sessions.customer_id
"""


In [None]:
!pip install Flask

In [None]:
from flask import Flask, jsonify

app = Flask(__name__)

@app.route("/metrics/orders", methods=["GET"])
def metrics_orders():
    # Fetch median visits before order
    cursor.execute(median_visits_sql)
    visits_data = cursor.fetchall()
    visits_data = [item[1] for item in visits_data]
    median_visits = sorted(visits_data)[len(visits_data) // 2] if visits_data else None

    # Fetch median session duration before order
    cursor.execute(median_duration_sql)
    duration_data = cursor.fetchall()
    duration_data = [item[1].total_seconds() / 60 for item in duration_data]  # Convert to minutes
    median_duration = sorted(duration_data)[len(duration_data) // 2] if duration_data else None
    
    return jsonify({
        "median_visits_before_order": median_visits,
        "median_session_duration_minutes_before_order": median_duration
    })

# Run the server on port 5000
if __name__ == "__main__":
    app.run(port=5000)


In [None]:
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
import psycopg2

app = Flask(__name__)

@app.route('/metrics/orders', methods=['GET'])
def get_order_metrics():
    # Connection details to PostgreSQL
    DB_NAME = "xcceleratedpostgresdb"
    DB_USER = "postgres"
    DB_PASSWORD = "AzerKimo2022@@@"
    DB_HOST = "localhost"
    DB_PORT = "5432"

    try:
        # Connect to the PostgreSQL database
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT
        )
        cursor = conn.cursor()
        
        # SQL query to calculate median visits before order
        median_visits_query = """
        SELECT
            percentile_cont(0.5) WITHIN GROUP (ORDER BY visits) AS median_visits
        FROM (
            SELECT
                customer_id,
                COUNT(DISTINCT session_id) AS visits
            FROM
                sessions
            WHERE
                customer_id IS NOT NULL
            GROUP BY
                customer_id
        ) subquery
        """
        
        # SQL query to calculate median session duration before order
        median_duration_query = """
        SELECT
            percentile_cont(0.5) WITHIN GROUP (ORDER BY duration) AS median_duration_minutes
        FROM (
            SELECT
                customer_id,
                MIN(duration) AS duration
            FROM
                sessions
            WHERE
                customer_id IS NOT NULL
            GROUP BY
                customer_id
        ) subquery
        """
        
        cursor.execute(median_visits_query)
        median_visits = cursor.fetchone()[0]
        
        cursor.execute(median_duration_query)
        median_duration_minutes = cursor.fetchone()[0]
        
        # Return the metrics as JSON response
        metrics = {
            "median_visits_before_order": median_visits,
            "median_session_duration_minutes_before_order": median_duration_minutes
        }
        
    except psycopg2.Error as e:
        # Handle database connection error
        metrics = {"error": str(e)}
        
    finally:
        # Close the cursor and connection
        if cursor:
            cursor.close()
        if conn:
            conn.close()
    
    return jsonify(metrics)

if __name__ == "__main__":
    app.run(debug=True, use_reloader=False)

In [None]:
import datetime
import psycopg2
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy

# Database configuration
DB_NAME = "xcceleratedpostgresdb"
DB_USER = "postgres"
DB_PASSWORD = "AzerKimo2022@@@"  # Consider using environment variables for sensitive info
DB_HOST = "localhost"
DB_PORT = "5432"

# Create Flask app
app = Flask(__name__)

# SQLAlchemy configuration for PostgreSQL
app.config['SQLALCHEMY_DATABASE_URI'] = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# Initialize SQLAlchemy for the app
db = SQLAlchemy(app)


# Model for the 'sessions' table
class Session(db.Model):
    __tablename__ = 'sessions'
    __table_args__ = {'schema': 'public'}
    session_id = db.Column(db.Integer, primary_key=True)
    customer_id = db.Column(db.Integer)
    start_time = db.Column(db.DateTime)
    end_time = db.Column(db.DateTime)
    duration = db.Column(db.Integer)

    def as_dict(self):
        return {c.name: getattr(self, c.name) for c in self.__table__.columns}


def get_db_connection():
    """
    Establish and return a connection to the PostgreSQL database.
    """
    return psycopg2.connect(
        dbname=DB_NAME,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )


@app.route('/metrics/orders', methods=['GET'])
def get_order_metrics():
    """
    Return metrics on median visits and session duration before an order.
    """
    try:
        conn = get_db_connection()
        cursor = conn.cursor()

        # SQL queries to calculate metrics
        median_visits_query = """
        SELECT
            percentile_cont(0.5) WITHIN GROUP (ORDER BY visits) AS median_visits
        FROM (
            SELECT
                customer_id,
                COUNT(DISTINCT session_id) AS visits
            FROM
                sessions
            WHERE
                customer_id IS NOT NULL
            GROUP BY
                customer_id
        ) subquery
        """
        
        median_duration_query = """
        SELECT
            percentile_cont(0.5) WITHIN GROUP (ORDER BY duration) AS median_duration_minutes
        FROM (
            SELECT
                customer_id,
                MIN(duration) AS duration
            FROM
                sessions
            WHERE
                customer_id IS NOT NULL
            GROUP BY
                customer_id
        ) subquery
        """

        cursor.execute(median_visits_query)
        median_visits = cursor.fetchone()[0]

        cursor.execute(median_duration_query)
        median_duration_minutes = cursor.fetchone()[0]

        metrics = {
            "median_visits_before_order": median_visits,
            "median_session_duration_minutes_before_order": median_duration_minutes
        }

    except psycopg2.Error as e:
        metrics = {"error": str(e)}

    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()

    return jsonify(metrics)


if __name__ == "__main__":
    app.run(debug=True, use_reloader=False)
