In [2]:
# Imports
import pathlib
import os
from dataclasses import dataclass, field
from typing import Dict, Literal, Optional, Tuple, List
import json
import collections

import imutils
import numpy as np
import cv2
from dataclasses_json import DataClassJsonMixin
import pandas as pd
from tqdm import tqdm

# Constants 
CWD = pathlib.Path(os.path.abspath(""))
GIT_ROOT = CWD.parent
DATA_DIR = GIT_ROOT / "data" / 'PhotosynthesisFall2022'

In [3]:
# Game state reconstruction routines
@dataclass
class Participant(DataClassJsonMixin):
    id: str
    position: Tuple[float, float]
    state: Literal['null', 'H2O', 'CO2', 'Sugar', 'O2', 'Thinking_H2O'] = 'null'

@dataclass
class EnvironmentState(DataClassJsonMixin):
    sun_state: Optional[bool] = None

@dataclass
class GameState(DataClassJsonMixin):
    participants: Dict[str, Participant] = field(default_factory=dict)
    environment: EnvironmentState = field(default_factory=EnvironmentState)

@dataclass
class GamePhaseState(DataClassJsonMixin):
    phase: Literal['before_game', 'molecule_select', 'normal', 'paused']

# Additional for state-action sequence generation
@dataclass
class StateAction(DataClassJsonMixin):
    timestamp: float
    phase: Literal['molecule_select', 'normal']
    state: Literal['H2O', 'CO2', 'Sugar', 'O2', 'Thinking_H2O']
    action: Literal['rabbit', 'choro', 'roots', 'plant']
    out_state: Literal['H2O', 'CO2', 'Sugar', 'O2', 'Thinking_H2O']
    success: bool
    distance: float
    duration: float

@dataclass
class TrackedParticipant:
    timestamp: float
    participant: Participant
    movement_history: List[Tuple[float, float]] = field(default_factory=list)
    prior_stateaction: Optional[StateAction] = None

CORRECT_TRANSITIONS = {
    "null": {
        "initialization": "O2"
    },
    "H2O": {
        "choro": "O2",
    },
    "O2": {
        "rabbit": "CO2",
    },
    "CO2": {
        "choro": "Sugar"
    },
    "Sugar": {
        "plant": "Thinking_H2O",
    },
    "Thinking_H2O": {
        "roots": "H2O"
    }
}

# In-game items
ITEMS = {
    "rabbit": {"lt": (50, 370), "rb": (170, 465)},
    "plant": {"lt": (200, 95), "rb": (365, 460)}, # Mostly students don't purposefully transition from Sugar to Thinking_H2O
    "choro1": {"lt": (445, 60), "rb": (545, 170)},
    "choro2": {"lt": (500, 390), "rb": (600, 500)},
    "roots": {"lt": (200, 460), "rb": (365, 560)},
}

In [4]:
# Helper functions
def xy_transforms(xy: Tuple[float, float], w: int, h:int, c: Dict[str, float]) -> Tuple[float, float]:
    x, y = xy
    xx = int(((x+1)*h/2)*c['AFFINE'][0] + c['OFFSET'][0])
    yy = int(((y+1)*w/2)*c['AFFINE'][1] + c['OFFSET'][1])
    return (xx, yy)

def detect_action(xy: Tuple[float, float], w: int, h:int, c: Dict[str, float]) -> Tuple[float, float]:
    xy = xy_transforms(xy, w, h, c)

    # Check if they are inside any of the items
    for item, coords in ITEMS.items():
        if coords['lt'][0] < xy[0] < coords['rb'][0] and coords['lt'][1] < xy[1] < coords['rb'][1]:
            return item
        
    return None

def compute_distance_list(xys: List[Tuple[float, float]]):
    distances = []
    for i in range(len(xys)-1):
        distances.append(np.linalg.norm(np.array(xys[i+1]) - np.array(xys[i])))
    return distances

def render(game_state: GameState, frame: np.ndarray, c: Dict[str, float]):

    if game_state.participants:
        for p in game_state.participants.values():
            if isinstance(p.position[0], str):
                p.position = (eval(p.position[0]), eval(p.position[1]))
            xy = xy_transforms(p.position, frame.shape[0], frame.shape[1], c)
            frame = cv2.circle(frame, xy, 30, (0, 0, 255), 1)
            frame = cv2.putText(
                frame, 
                p.id, 
                (xy[0]-22, xy[1]-11), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.4, 
                (0,0,0), 
                2, 
                cv2.LINE_AA
            )
            frame = cv2.putText(
                frame, 
                p.id, 
                (xy[0]-22, xy[1]-11), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.4, 
                (255,255,255), 
                1, 
                cv2.LINE_AA
            )
            frame = cv2.putText(
                frame, 
                p.state, 
                (xy[0]-5*(len(p.state)), xy[1]+13), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.6, 
                (0,0,0), 
                2,
                cv2.LINE_AA
            )
            frame = cv2.putText(
                frame, 
                p.state, 
                (xy[0]-5*(len(p.state)), xy[1]+13), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.6, 
                (255,255,255), 
                1,
                cv2.LINE_AA
            )

    return frame

In [6]:
# Put in ceil to allow debugging
def get_state_action_sequence(timestamp, tracked_participants: Dict[str, TrackedParticipant], game_state: GameState, c: Dict[str, float], game_phase: GamePhaseState) -> Dict[str, StateAction]:
    
    # Check for the game phase
    if game_phase.phase in ['before_game', 'paused']:
        return {}

    # Create container
    state_action_sequence: Dict[str, StateAction] = {}

    for p in game_state.participants.values():
        if p.id not in tracked_participants:
            tracked_participants[p.id] = TrackedParticipant(
                timestamp,
                p
            )
        else:
            # Identify if a transition has occurred
            if tracked_participants[p.id].participant.state != p.state:

                # If occur, compute state-action information
                distance = sum(compute_distance_list(tracked_participants[p.id].movement_history))

                # Depending on phase, the action can be different
                if game_phase.phase == 'molecule_select':
                    action = 'user_select'
                else:
                    action = list(CORRECT_TRANSITIONS[tracked_participants[p.id].participant.state].keys())[0]

                state_action = StateAction(
                    timestamp=timestamp,
                    phase=game_phase.phase,
                    state=tracked_participants[p.id].participant.state,
                    action=action,
                    out_state=p.state,
                    distance=distance,
                    duration=timestamp - tracked_participants[p.id].timestamp,
                    success = True
                )
                state_action_sequence[p.id] = state_action

                # Reset information
                # p.position = (eval(p.position[0]), eval(p.position[1]))
                tracked_participants[p.id].timestamp = timestamp
                tracked_participants[p.id].movement_history = [p.position]
                tracked_participants[p.id].participant = p # position and state
                tracked_participants[p.id].prior_stateaction = state_action

            else:
                # Identify if an action has occurred (incorrect one)
                # p.position = (eval(p.position[0]), eval(p.position[1]))
                action = detect_action(p.position, 566, 650, c)
                if action:

                    # Handling choro1 and choro2 to choro
                    action = action.replace('1','').replace('2', '')

                    # Detect if we obtain a correct transition before the logs caught up
                    correct_action = list(CORRECT_TRANSITIONS[tracked_participants[p.id].participant.state].keys())[0]
                    if action == correct_action:
                        continue

                    # if p.id == 'pz6942':
                    #     print(action)

                    # Avoid duplicate state_actions (if the student stays in the same place)
                    prior_state_action = tracked_participants[p.id].prior_stateaction
                    if not prior_state_action or action != prior_state_action.action:

                        # If occur, compute state-action information
                        distance = sum(compute_distance_list(tracked_participants[p.id].movement_history))
                        state_action = StateAction(
                            timestamp=timestamp,
                            phase=game_phase.phase,
                            state=tracked_participants[p.id].participant.state,
                            action=action,
                            out_state=p.state,
                            distance=distance,
                            duration=timestamp - tracked_participants[p.id].timestamp,
                            success=False
                        )
                        state_action_sequence[p.id] = state_action
                        tracked_participants[p.id].prior_stateaction = state_action

                        # if p.id == 'pz6942':
                        #     print(f"State-action detected: {state_action}")

                # Update data
                tracked_participants[p.id].participant.position = p.position
                tracked_participants[p.id].movement_history.append(p.position)

    return state_action_sequence


# Transformation constants
# START_INDEX = (3*60+25)*30 # Day 11
# START_INDEX = 165*30 # Day 12
START_INDEX = 110*30 # Day 13
CORRECTIONS = {'OFFSET': (-252,-280), 'AFFINE': (1.8,2)}
RECORD = True

def state_action_processing(id, vid_file, game_state_file, annotated_events_file):
    
    # Load the data
    assert vid_file.exists()
    cap = cv2.VideoCapture(str(vid_file))
    fps = cap.get(cv2.CAP_PROP_FPS)
    N = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # N = 3000

    assert game_state_file.exists()
    game_state_logs = pd.read_csv(game_state_file)

    assert annotated_events_file.exists()
    annotated_events = pd.read_csv(annotated_events_file)

    # Reset video and logs
    cap.set(cv2.CAP_PROP_POS_FRAMES, START_INDEX)
    game_state_pointer = 0
    game_event_pointer = 0

    # Create container for each participant state-action data
    tracked_participants: Dict[str, TrackedParticipant] = {}
    sequences: Dict[str, List[StateAction]] = {}
    game_phase = GamePhaseState(phase='before_game')

    # If you want to save the video
    if RECORD:
        video_file = DATA_DIR / 'state_action_sequences' / f"{id}-video.mp4"
        fps = cap.get(cv2.CAP_PROP_FPS)
        fourcc = cv2.VideoWriter_fourcc('F','M','P','4')
        out = cv2.VideoWriter(str(video_file),fourcc,fps,(650,566))

    for i in tqdm(range(N), total=N):
        
        ret, frame = cap.read()
        if not ret:
            break

        frame_timestamp = (START_INDEX + i) / fps
        frame = cv2.putText(
            frame, 
            f"{pd.Timestamp(frame_timestamp, unit='s').strftime('%H:%M:%S.%f')}", 
            (5,30), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                1, 
                (0,0,255), 
                1, 
                cv2.LINE_AA
            )    
        
        # Processing events (like Game Phase)
        while len(annotated_events) > game_event_pointer+1 and annotated_events['timestamp'].iloc[game_event_pointer+1] < frame_timestamp:
            event = annotated_events.iloc[game_event_pointer+1]
            if event.event_type == 'phase':
                data = event.event_data.replace("“", '"')
                data = data.replace("”", '"')
                phase = json.loads(data)['phase']
                assert phase in ['before_game', 'molecule_select', 'normal', 'paused']
                game_phase.phase = phase

            game_event_pointer += 1

        # Obtain game state information
        while game_state_logs['timestamp'].iloc[game_state_pointer+1] < frame_timestamp:
            game_state_pointer += 1

        game_state = GameState.from_json(game_state_logs.iloc[game_state_pointer].state)

        # For macro alignment
        # frame = cv2.putText(
        #     frame, 
        #     f"{game_state_pointer}", 
        #     (5,60), 
        #     cv2.FONT_HERSHEY_SIMPLEX, 
        #     1, 
        #     (0,0,255), 
        #     1, 
        #     cv2.LINE_AA
        # )
        frame = cv2.putText(
            frame, 
            f"{game_phase.phase}", 
            (5,60), 
            cv2.FONT_HERSHEY_SIMPLEX,
            1, 
            (0,0,255), 
            1, 
            cv2.LINE_AA
        )

        # For each item, draw the rectangle
        for item, item_data in ITEMS.items():
            lt = item_data['lt']
            rb = item_data['rb']
            frame = cv2.rectangle(frame, lt, rb, (0, 0, 255), 1)
            frame = cv2.putText(
                frame, 
                item, 
                (lt[0], lt[1]+10), 
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.4, 
                (0,0,255), 
                1, 
                cv2.LINE_AA
            )

        frame = render(game_state, frame, CORRECTIONS)
        cv2.imshow('frame', imutils.resize(frame, width=1000))
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

        if RECORD:
            out.write(frame)

        # Perform sequence extraction
        timestamp = game_state_logs['timestamp'].iloc[game_state_pointer]
        step_sequences = get_state_action_sequence(timestamp, tracked_participants, game_state, CORRECTIONS, game_phase)

        for p, s in step_sequences.items():
            if p not in sequences:
                sequences[p] = []
            sequences[p].append(s)

    cv2.destroyAllWindows()
    if RECORD:
        out.release()

    # Convert and save data
    for p, data in sequences.items():
        df_data = collections.defaultdict(list)
        for d in data:
            dict_data = d.to_dict()
            for k, v in dict_data.items():
                df_data[k].append(v)
        df = pd.DataFrame(df_data)
        df.to_csv(DATA_DIR / 'state_action_sequences' / f'{id}_{p}.csv', index=False)

# Processing - Day 11
# state_action_processing(
#     'day11', 
#     DATA_DIR / 'videos' / 'day 11' / "day11-screen-recording.mp4",
#     DATA_DIR / 'time_alignment' / 'day11_aligned_game_state.csv',
#     DATA_DIR / 'videos' / 'day 11' / 'annotated_game_events.csv'
# )

# Processing - Day 12
state_action_processing(
    'day12', 
    DATA_DIR / 'videos' / 'day 12' / "day12-screen-recording.mp4",
    DATA_DIR / 'time_alignment' / 'day12_aligned_game_state.csv',
    DATA_DIR / 'videos' / 'day 12' / 'annotated_game_events.csv'
)

# Processing - Day 13
# state_action_processing(
#     'day13', 
#     DATA_DIR / 'videos' / 'day 13' / "day13-screen-recording-corrected.mp4",
#     DATA_DIR / 'time_alignment' / 'day13_aligned_game_state-complete.csv',
#     DATA_DIR / 'videos' / 'day 13' / 'annotated_game_events.csv'
# )

OpenCV: FFMPEG: tag 0x34504d46/'FMP4' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
 83%|████████▎ | 16498/19798 [02:24<00:28, 113.88it/s]
