## **Install the dependency packages**

In [1]:
!python -m venv .venv
!.venv\Scripts\Activate.ps1
!pip install google-cloud-pubsub python-dotenv pandas 

zsh:1: command not found: .venvScriptsActivate.ps1


## **Import Function**

In [2]:
import pandas as pd
import json, pathlib, os, csv
from google.cloud import pubsub_v1



## **Initialization**

In [3]:
PROJECT_ID = "iisc-data-engineering-project"
TOPIC_ID = "match_data"
SUBSCRIPTION_ID = "match_data-sub"
matches_FILE = "../../data/raw/matches.csv"
deliveries_FILE = "../../data/raw/deliveries.csv"
batch_setting = pubsub_v1.types.BatchSettings(
    max_bytes=5*1024*1024,  # Maximum bytes to batch
    max_messages=1000,  # Maximum number of messages to batch
    max_latency=0.5,  # Maximum latency in seconds
)
publisher = pubsub_v1.PublisherClient(batch_settings=batch_setting)
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

## **Two dataset are used of the following structure. The dataset is from IPL 2008-2024**
1. Matches  - Contains Match level data, Each row represents a single IPL match, including venue, teams, toss results, match outcomes, and player awards.
2. Deliveries - Each row represents one delivery (ball) in an IPL match. It links to matches.csv via the match_id column.
Data Set Size
1. Matches - 1094 Match Entries
2. Deliveries -  260919 Deliveriy Entries


**Matches Schema Data**

| Field | Description |
|:--|:--|
| **id** | Unique match identifier |
| **season** | IPL season year |
| **city** | Venue city |
| **date** | Date of the match |
| **match_type** | Type of match (League/Qualifier/Final) |
| **player_of_match** | Player awarded “Man of the Match” |
| **venue**| Stadium name |
| **team1** | Home team |
| **team2** | Away team |
| **toss_winner** | Team that won the toss |
| **toss_decision** | Toss decision (bat/field) |
| **winner** | Match-winning team |
| **result** | How the match was decided |
| **result_margin** | Margin of victory (in runs or wickets) |
| **target_runs** | Target set for the chasing team |
| **target_overs** | Overs for the target innings |
| **super_over** | Whether a Super Over occurred |
| **method** | Method (Duckworth-Lewis, etc.), if applicable |
| **umpire1** | First on-field umpire |
| **umpire2** | Second on-field umpire |


> ```json
> {
>   "eventType": "match",
>   "payload": {
>     "id": 980933,
>     "season": "2016",
>     "city": "Delhi",
>     "date": "2016-04-23",
>     "match_type": "League",
>     "player_of_match": "SV Samson",
>     "venue": "Feroz Shah Kotla",
>     "team1": "Delhi Daredevils",
>     "team2": "Mumbai Indians",
>     "toss_winner": "Mumbai Indians",
>     "toss_decision": "field",
>     "winner": "Delhi Daredevils",
>     "result": "runs",
>     "result_margin": 10.0,
>     "target_runs": 165.0,
>     "target_overs": 20.0,
>     "super_over": "N",
>     "method": "NaN",
>     "umpire1": "S Ravi",
>     "umpire2": "C Shamshuddin"
>   }
> }
> ```

---


**Deliveries Dataset**

| Field | Description |
|:--|:--|
| **match_id** | Identifier linking delivery to its match |
| **inning** | Inning number (1 or 2) |
| **batting_team** | Team currently batting |
| **bowling_team** | Team currently bowling |
| **over** | Over number in the innings |
| **ball** | Ball number within the over |
| **batter** | Name of the batsman on strike |
| **bowler** | Name of the bowler delivering the ball |
| **non_striker** | Name of the non-striker batsman |
| **batsman_runs** | Runs scored by the batsman from the delivery |
| **extra_runs** | Runs awarded as extras (wide, no-ball, bye, etc.) |
| **total_runs** | Total runs scored off the delivery (batsman + extras) |
| **extras_type** | Type of extra (if any) — e.g., wides, byes |
| **is_wicket** | Indicates if a wicket fell on this delivery (0/1) |
| **player_dismissed** | Name of player dismissed, if any |
| **dismissal_kind** | Type of dismissal (bowled, caught, etc.) |
| **fielder** | Name of fielder involved in the dismissal |


```json
{
  "eventType": "deliveries",
  "payload": {
    "match_id": "1136597",
    "inning": "1",
    "batting_team": "Mumbai Indians",
    "bowling_team": "Kolkata Knight Riders",
    "over": "4",
    "ball": "3",
    "batter": "RV Uthappa",
    "bowler": "MJ McClenaghan",
    "non_striker": "N Rana",
    "batsman_runs": "1",
    "extra_runs": "0",
    "total_runs": "1",
    "extras_type": "",
    "is_wicket": "0",
    "player_dismissed": "NA",
    "dismissal_kind": "NA",
    "fielder": "NA"
  }
}


In [4]:
matches = pd.read_csv(matches_FILE)
deliveries = pd.read_csv(deliveries_FILE)
print(matches.shape)
print(deliveries.shape)
print("Matches DataFrame:")
print(matches.head(5))

print("\nDeliveries DataFrame:")
print(deliveries.head(5))

(1095, 20)
(260920, 17)
Matches DataFrame:
       id   season        city        date match_type player_of_match  \
0  335982  2007/08   Bangalore  2008-04-18     League     BB McCullum   
1  335983  2007/08  Chandigarh  2008-04-19     League      MEK Hussey   
2  335984  2007/08       Delhi  2008-04-19     League     MF Maharoof   
3  335985  2007/08      Mumbai  2008-04-20     League      MV Boucher   
4  335986  2007/08     Kolkata  2008-04-20     League       DJ Hussey   

                                        venue                        team1  \
0                       M Chinnaswamy Stadium  Royal Challengers Bangalore   
1  Punjab Cricket Association Stadium, Mohali              Kings XI Punjab   
2                            Feroz Shah Kotla             Delhi Daredevils   
3                            Wankhede Stadium               Mumbai Indians   
4                                Eden Gardens        Kolkata Knight Riders   

                         team2                  t

## **This is a sample toy data for testing Publish and Subscribe Flow in pubsub**

In [5]:
sample_match = matches.sample(1, random_state=42).to_dict(orient="records")[0]
print ("Sample Match Record:")
print (sample_match)
message = {
    "event_type": "match",
    "payload": sample_match
}
data = json.dumps(message, default=str).encode("utf-8")
future = publisher.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
print(json.dumps(message, indent=2)[:600])

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)
response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": 10}
)
for received_message in response.received_messages:
    print(f"Received message: {received_message.message.data.decode('utf-8')}")
    subscriber.acknowledge(
        request={
            "subscription": subscription_path,
            "ack_ids": [received_message.ack_id]
        }
    )

Sample Match Record:
{'id': 980933, 'season': '2016', 'city': 'Delhi', 'date': '2016-04-23', 'match_type': 'League', 'player_of_match': 'SV Samson', 'venue': 'Feroz Shah Kotla', 'team1': 'Delhi Daredevils', 'team2': 'Mumbai Indians', 'toss_winner': 'Mumbai Indians', 'toss_decision': 'field', 'winner': 'Delhi Daredevils', 'result': 'runs', 'result_margin': 10.0, 'target_runs': 165.0, 'target_overs': 20.0, 'super_over': 'N', 'method': nan, 'umpire1': 'S Ravi', 'umpire2': 'C Shamshuddin'}
Published message ID: 16998618098451568
{
  "event_type": "match",
  "payload": {
    "id": 980933,
    "season": "2016",
    "city": "Delhi",
    "date": "2016-04-23",
    "match_type": "League",
    "player_of_match": "SV Samson",
    "venue": "Feroz Shah Kotla",
    "team1": "Delhi Daredevils",
    "team2": "Mumbai Indians",
    "toss_winner": "Mumbai Indians",
    "toss_decision": "field",
    "winner": "Delhi Daredevils",
    "result": "runs",
    "result_margin": 10.0,
    "target_runs": 165.0,
   

## **Push all the dataset to PubSub broker service**
*Two event types are maitained "matches" and "deliveries"*
- matches event_type for Matches dataset queue
- deliveries event_type for Deliveries  dataset

In [6]:
def publish_csv(file_path: str, event_type: str) -> int:
    """
    Reads a CSV and publishes each row as:
    {
      "eventType": "<event_type>",
      "payload": {<csv row as JSON>}
    }
    Returns number of published messages.
    """
    p = pathlib.Path(file_path)
    if not p.exists():
        raise FileNotFoundError(f"CSV not found: {p.resolve()}")

    futures = []
    with p.open("r", newline="", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        for row in reader:

            message_obj = {
                "eventType": event_type,
                "payload": row,          
            }
            data_bytes = json.dumps(message_obj, ensure_ascii=False).encode("utf-8")

            # Publish message
            fut = publisher.publish(
                topic_path,
                data=data_bytes,
                source_file=p.name,
                eventType=event_type
            )
            futures.append(fut)

    # Ensure all publishes complete (raise on error)
    for fut in futures:
        fut.result()
    return len(futures)

# Publish both Datasets
count_matches = publish_csv(matches_FILE, "match")
count_deliveries = publish_csv(deliveries_FILE, "deliveries")

print(f"Published {count_matches} 'match' messages and {count_deliveries} 'deliveries' messages.")

Published 1095 'match' messages and 260920 'deliveries' messages.
