In [1]:
import pandas as pd
from typing import List, Dict, Optional
import re

def predict_from_dict(model, input_dict, feature_order=[
            'Shots on Goal_diff', 'Shots off Goal_diff',
            'Corner Kicks_diff','Blocked Shots_diff','team id',
            'Goalkeeper Saves_diff'
        ]):
    """
    Make predictions from a dictionary input.

    model: trained ML model
    input_dict: dict of features (e.g. {"age": 25, "height": 175, ...})
    feature_order: list of feature names in the same order used during training
                   (optional, but safer to avoid column mix-up)
    """
    # If no feature order is given, use dict keys order
    if feature_order:
        data = [[input_dict[feat] for feat in feature_order]]
        df = pd.DataFrame(data, columns=feature_order)
    else:
        df = pd.DataFrame([input_dict])

    # Predict
    return model.predict(df)[0]   # return single value


In [16]:
import pandas as pd
import re
from typing import List, Dict, Optional

class LiveMatchTracker:
    """Tracks live match events from text data (tweets, commentary)"""

    def __init__(self):
        self.events = {"events": []}
        self.stats = {
            "Corner Kicks_diff": 0,
            "team id": 0,
            "Blocked Shots_diff": 0,
            "Shots on Goal_diff": 0,
            "Shots off Goal_diff": 0,
            "Goalkeeper Saves_diff": 0
        }
        self.recent_events = {"events": []}

    def extract_events_from_text(self, text_batch: List[str]) -> Dict:
        new_events = []
        for text in text_batch:
            text = text.lower()
            event = self._parse_single_text(text)
            if event:
                new_events.append(event)
        return {"events": new_events}

    def _parse_single_text(self, text: str) -> Optional[Dict]:
        time_pattern = r'(\d+)(?:\+(\d+))?[\'â€™]?'
        time_match = re.search(time_pattern, text)
        time_str = ""
        if time_match:
            base = int(time_match.group(1))
            extra = int(time_match.group(2)) if time_match.group(2) else 0
            time_str = f"{base}+{extra}" if extra > 0 else str(base)

        patterns = {
            'goal': [r'goal', r'scores', r'finds the net', r'into the goal'],
            'yellow_card': [r'yellow card', r'booked', r'cautioned'],
            'red_card': [r'red card', r'sent off', r'dismissed'],
            'save': [r'save', r'saved', r'keeper', r'goalkeeper'],
            'shot': [r'shot', r'shoots', r'effort'],
            'corner': [r'corner', r'corner kick'],
            'substitution': [r'substitution', r'replaced', r'comes on'],
            'offside': [r'offside', r'flagged'],
            'foul': [r'foul', r'fouled', r'free kick']
        }

        for event_type, keywords in patterns.items():
            if any(keyword in text for keyword in keywords):
                return {
                    "time": time_str,
                    "event": f"{event_type.replace('_', ' ').title()}: {text[:100]}"
                }

    def update_events(self, new_events: Dict):
        if "event" in new_events:
            self.events["events"].append(new_events["event"])
            self.recent_events["events"].append(new_events["event"])
        elif "events" in new_events:
            self.events["events"].extend(new_events["events"])
            self.recent_events["events"].extend(new_events["events"])

    def generate_summary(self) -> Dict:
        if not self.recent_events["events"]:
            return {"summary": "No major events in this period.", "stats": self.stats}

        event_counts = {}
        for event in self.recent_events["events"]:
            event_text = event.get("event", "").lower()
            if "save" in event_text:
                event_counts["saves"] = event_counts.get("saves", 0) + 1
                self.stats["Goalkeeper Saves_diff"] += 1
                self.stats["Blocked Shots_diff"] += 1
            elif "shot" in event_text:
                event_counts["shots"] = event_counts.get("shots", 0) + 1
                if "on goal" in event_text or "on target" in event_text:
                    self.stats["Shots on Goal_diff"] += 1
                else:
                    self.stats["Shots off Goal_diff"] += 1
            elif "corner" in event_text:
                event_counts["corner"] = event_counts.get("corner", 0) + 1
                self.stats["Corner Kicks_diff"] += 1

        summary_parts = [f"{count} {etype}" for etype, count in event_counts.items() if count > 0]
        summary = f"Recent events: {', '.join(summary_parts)}" if summary_parts else "No major events."
        self.recent_events = {"events": []}
        return {"summary": summary, "stats": self.stats.copy()}

    def process_match_data(self, file_path: str, model, interval_minutes: int = 5,
                           timestamp_col="timestamp", tweet_col="tweet"):
        try:
            df = pd.read_csv(file_path)
            if timestamp_col not in df.columns:
                raise ValueError(f"Timestamp column '{timestamp_col}' not found.")

            # Convert MM:SS to total minutes
            def mmss_to_minutes(s):
                try:
                    m, sec = map(int, s.split(":"))
                    return m + sec / 60
                except:
                    return 0

            df["minute"] = df[timestamp_col].astype(str).apply(mmss_to_minutes)
            df = df.sort_values("minute").reset_index(drop=True)
            max_minute = int(df["minute"].max())

            for start in range(0, max_minute + 1, interval_minutes):
                end = start + interval_minutes
                mask = (df["minute"] >= start) & (df["minute"] < end)
                tweets_batch = df.loc[mask, tweet_col].dropna().astype(str).tolist()

                if tweets_batch:
                    events = self.extract_events_from_text(tweets_batch)
                    self.update_events(events)
                    summary = self.generate_summary()
                    print(f"\nInterval {self._minutes_to_mmss(start)} - {self._minutes_to_mmss(end)}")
                    print(" Summary:", summary["summary"])
                    print(" Stats:", summary["stats"])
                    x = summary["stats"]
                    y = predict_from_dict(model, x)
                    print(" Prediction:", y)

        except FileNotFoundError:
            print(f"File {file_path} not found.")

    @staticmethod
    def _minutes_to_mmss(minutes: float) -> str:
        m = int(minutes)
        s = int(round((minutes - m) * 60))
        return f"{m:02d}:{s:02d}"


In [17]:
import joblib

# Load model later
model = joblib.load("my_model.pkl")

In [23]:

tracker = LiveMatchTracker()
tracker.process_match_data("tweets.csv", model,
                           timestamp_col="timestamp", tweet_col="text", interval_minutes=5)


Interval 00:00 - 05:00
 Summary: Recent events: 13 corner, 8 shots, 3 saves
 Stats: {'Corner Kicks_diff': 13, 'team id': 0, 'Blocked Shots_diff': 3, 'Shots on Goal_diff': 0, 'Shots off Goal_diff': 8, 'Goalkeeper Saves_diff': 3}
 Prediction: 1

Interval 05:00 - 10:00
 Summary: Recent events: 6 shots, 1 saves, 2 corner
 Stats: {'Corner Kicks_diff': 15, 'team id': 0, 'Blocked Shots_diff': 4, 'Shots on Goal_diff': 0, 'Shots off Goal_diff': 14, 'Goalkeeper Saves_diff': 4}
 Prediction: 1

Interval 10:00 - 15:00
 Summary: Recent events: 11 corner, 6 shots
 Stats: {'Corner Kicks_diff': 26, 'team id': 0, 'Blocked Shots_diff': 4, 'Shots on Goal_diff': 0, 'Shots off Goal_diff': 20, 'Goalkeeper Saves_diff': 4}
 Prediction: 1

Interval 15:00 - 20:00
 Summary: Recent events: 6 shots, 11 corner
 Stats: {'Corner Kicks_diff': 37, 'team id': 0, 'Blocked Shots_diff': 4, 'Shots on Goal_diff': 0, 'Shots off Goal_diff': 26, 'Goalkeeper Saves_diff': 4}
 Prediction: 1

Interval 20:00 - 25:00
 Summary: Recent