In [8]:
# 세션 윈도우 테스트 초기화
import json
import random
from datetime import datetime, timedelta
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

In [2]:
# Redis 연결 및 키 확인
import redis
import json
from pprint import pprint

# Redis 클라이언트 생성
r = redis.Redis(host='localhost', port=6379, decode_responses=True)

try:
    r.ping()
    print('Redis 연결 성공')
except Exception as e:
    print(f"Redis 연결 실패: {e}")


Redis 연결 성공


In [43]:
# 1. 모든 user_features 키 조회
user_keys = r.keys("*")
for key in user_keys[:10]:  # 처음 10개만 표시
    print(f"- {key}")


- user_features:111


In [51]:

# 사용자 111: 하나의 긴 세션 (5분간 활동)
print("사용자 111 세션 생성 (단일 세션, 5분간 활동):")

user111_events = [
    {"type": "view", "offset_minutes": 0},
    {"type": "view", "offset_minutes": 1}, 
    {"type": "cart", "offset_minutes": 3},
    {"type": "purchase", "offset_minutes": 5}
]

# 기준 시간 설정
base_time = datetime.now()
print(base_time)

for event_info in user111_events:
    event_time = base_time + timedelta(minutes=event_info["offset_minutes"])
    
    event = {
        "event_time": event_time.isoformat(),
        "event_type": event_info["type"],
        "price": round(random.uniform(10, 500), 2),
        "user_id": "111",
    }
    
    producer.send("ecommerce-events", value=event)
    print(f"  {event_info['type']} - {event_time.strftime('%H:%M:%S')} (+{event_info['offset_minutes']}분)")

producer.flush()
print("✅ 사용자 111 이벤트 전송 완료\n")


사용자 111 세션 생성 (단일 세션, 5분간 활동):
2025-07-04 17:54:23.690540
  view - 17:54:23 (+0분)
  view - 17:55:23 (+1분)
  cart - 17:57:23 (+3분)
  purchase - 17:59:23 (+5분)
✅ 사용자 111 이벤트 전송 완료



In [59]:
event_time = datetime.now() + timedelta(minutes=10)
print(event_time)
event = {
        "event_time": event_time.isoformat(),
        "event_type": event_info["type"],
        "price": round(random.uniform(10, 500), 2),
        "user_id": "111",
    }
print(f"  {event_info['type']} - {event_time.strftime('%H:%M:%S')} 분)")
producer.send("ecommerce-events", value=event)
producer.flush()

2025-07-04 18:12:31.616868
  purchase - 18:12:31 분)


In [60]:
user_keys = r.keys("*")
for key in user_keys[:10]:  # 처음 10개만 표시
    print(f"- {key}")

- user_features:222
- session_features:111:1751619751805
- user_features:111


In [46]:
r.hgetall("session_features:111:1751619751805")

{'duration_seconds': '300', 'user_id': '111', 'event_count': '4'}

In [17]:
r.hgetall("session_features:111:1751556831315")

{'duration_seconds': '300', 'user_id': '111', 'event_count': '12'}

In [None]:
# 사용자 222: 두 개의 분리된 세션 (30분 간격으로 세션 분리)
print("사용자 222 세션 생성 (2개 세션으로 분리):")

user222_events = [
    {"type": "view", "offset_minutes": 0},
    {"type": "cart", "offset_minutes": 2},
    {"type": "view", "offset_minutes": 35},  # 30분 후 새 세션
]

for event_info in user222_events:
    event_time = base_time + timedelta(minutes=event_info["offset_minutes"])
    
    event = {
        "event_time": event_time.isoformat(),
        "event_type": event_info["type"],
        "price": round(random.uniform(10, 500), 2),
        "user_id": "222",
    }
    
    producer.send("ecommerce-events", value=event)
    
    if event_info["offset_minutes"] < 30:
        session_info = "첫 번째 세션"
    else:
        session_info = "두 번째 세션"
    
    print(f"  {event_info['type']} - {event_time.strftime('%H:%M:%S')} (+{event_info['offset_minutes']}분) [{session_info}]")

producer.flush()
print("✅ 사용자 222 이벤트 전송 완료\n")


In [47]:
# 사용자 333: 짧은 단일 세션
print("사용자 333 세션 생성 (단일 이벤트 세션):")

# 기준 시간 설정
base_time = datetime.now()
print(base_time)

user333_events = [
    {"type": "view", "offset_minutes": 0},
    {"type": "view", "offset_minutes": 2},
    {"type": "view", "offset_minutes": 5},
]

# user222 워터마크가 user333에 영향이 있는지 테스트
user222_events = [
    {"type": "view", "offset_minutes": 12}
]

for event_info in user333_events:
    event_time = base_time + timedelta(minutes=event_info["offset_minutes"])
    
    event = {
        "event_time": event_time.isoformat(),
        "event_type": event_info["type"],
        "price": round(random.uniform(10, 500), 2),
        "user_id": "333",
    }
    
    producer.send("ecommerce-events", value=event)
    print(f"  {event_info['type']} - {event_time.strftime('%H:%M:%S')} (+{event_info['offset_minutes']}분)")
print("✅ 사용자 333 이벤트 전송 완료\n")
    
for event_info in user222_events:
    event_time = base_time + timedelta(minutes=event_info["offset_minutes"])
    
    event = {
        "event_time": event_time.isoformat(),
        "event_type": event_info["type"],
        "price": round(random.uniform(10, 500), 2),
        "user_id": "222",
    }
    
    producer.send("ecommerce-events", value=event)
    print(f"  {event_info['type']} - {event_time.strftime('%H:%M:%S')} (+{event_info['offset_minutes']}분)")
print("✅ 사용자 222 이벤트 전송 완료\n")

producer.flush()


사용자 333 세션 생성 (단일 이벤트 세션):
2025-07-04 01:44:30.132454
  view - 01:44:30 (+0분)
  view - 01:46:30 (+2분)
  view - 01:49:30 (+5분)
✅ 사용자 333 이벤트 전송 완료

  view - 01:56:30 (+12분)
✅ 사용자 222 이벤트 전송 완료



In [26]:
r.hgetall("session_features:333:1751557631488")

{}

In [9]:

# 세션 테스트 완료 메시지
print("모든 세션 테스트 완료!")
print("\n기대 결과:")
print("- 사용자 111: 1개 세션 (4개 이벤트, 5분 지속)")
print("- 사용자 222: 2개 세션 (첫 번째 2개 이벤트, 두 번째 1개 이벤트)")
print("- 사용자 333: 1개 세션 (1개 이벤트)")
print("\nRedis에서 확인:")
print("- session_feature:111:* → 1개 키")
print("- session_feature:222:* → 2개 키")
print("- session_feature:333:* → 1개 키")

In [16]:
# 세션 피처 결과 확인
print("세션 피처 결과 확인:")
print("각 사용자별 세션 피처 키 목록:")

session_keys = r.keys("session_feature:*")
if session_keys:
    for key in session_keys:
        session_data = r.hgetall(key)
        print(f"\n키: {key}")
        print(f"  사용자: {session_data.get('user_id')}")
        print(f"  이벤트 수: {session_data.get('event_count')}")
        print(f"  지속 시간: {session_data.get('duration_seconds')}초")
else:
    print("세션 피처 키가 없습니다. Flink 작업이 실행 중인지 확인해주세요.")


=== 2025-07-03 21:37:00 이후 사용자 111 이벤트 ===
현재 시간: 2025-07-03 21:43:49
최대 3초간 읽습니다...
Kafka: 21:37:48.430 | Event: 21:37:48.429 | view | Product: 11997188
Kafka: 21:37:48.430 | Event: 21:37:58.429 | view | Product: 18744325
Kafka: 21:37:48.430 | Event: 21:38:08.429 | view | Product: 46672814
Kafka: 21:37:48.430 | Event: 21:38:18.429 | view | Product: 74994405
Kafka: 21:37:48.430 | Event: 21:38:28.429 | view | Product: 79591538
Kafka: 21:37:48.431 | Event: 21:38:38.429 | cart | Product: 67247315
Kafka: 21:37:48.431 | Event: 21:38:48.429 | cart | Product: 75348890
Kafka: 21:37:48.431 | Event: 21:38:58.429 | purchase | Product: 86300472
Kafka: 21:37:59.673 | Event: 21:37:59.672 | view | Product: 99999999

=== 요약 ===
총 9개 이벤트 발견

이벤트 타입별 집계:
  cart: 2개
  purchase: 1개
  view: 6개


In [4]:
# 추가 테스트 이벤트 (옵션)
# 필요시 특정 사용자의 추가 이벤트를 발행할 수 있습니다

def send_additional_event(user_id, event_type, offset_minutes=0):
    """추가 이벤트 발행 헬퍼 함수"""
    event_time = base_time + timedelta(minutes=offset_minutes)
    event = {
        "event_time": event_time.isoformat(),
        "event_type": event_type,
        "price": round(random.uniform(10, 500), 2),
        "user_id": user_id,
    }
    producer.send("ecommerce-events", value=event)
    producer.flush()
    print(f"추가 이벤트 전송: {user_id} - {event_type} - {event_time.strftime('%H:%M:%S')}")

# 사용법 예시 (필요시 주석 해제 후 실행):
# send_additional_event("111", "view", 15)  # 15분 후 추가 이벤트
# send_additional_event("222", "cart", 45)  # 45분 후 추가 이벤트 (새 세션)

{}