In [None]:
import requests
import json
import xml.etree.ElementTree as ElementTree
import xmltodict
import sqlite3
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import time

from config import CTA_API_KEY

## Arrivals API

In [None]:
params = {'mapid': 40630,
         'stpid': 30122,
          'max': 3,
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("http://lapi.transitchicago.com/api/1.0/ttarrivals.aspx", params=params)

In [None]:
response.json()

In [None]:
root = ElementTree.fromstring(response.text)

In [None]:
# child = 
root.getchildren()

In [None]:
child.text

In [None]:
def xml_to_tree(node):
    curr = {}
    eta_num = 1
    for child in node.getchildren():
        child_tag = child.tag
        if child.getchildren():
            if child_tag == 'eta':
                child_tag += f'_{eta_num}'
                eta_num += 1
            curr[child_tag] = xml_to_tree(child)
        else:
            curr[child_tag] = child.text
    
    return curr

In [None]:
response.text

In [None]:
print(xml_to_tree(root))

In [None]:
data_dict = xmltodict.parse(response.content)

## Follow this train API

In [None]:
params = {'runnumber':831,
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("https://lapi.transitchicago.com/api/1.0/ttfollow.aspx", params=params)

In [None]:
response.json()

## Locations API

In [None]:
params = {'rt':'red',
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("https://lapi.transitchicago.com/api/1.0/ttpositions.aspx", params=params)

In [None]:
response.json()

## Database for storing train run delays per stop

In [None]:
conn = sqlite3.connect("cta_trains.db")
cur = conn.cursor()

In [None]:
# Create train_delays table
cur.execute("""
create table train_delays
(train_line string, run_number int, day date,
previous_station string, next_station string, 
delay_in_minutes float)
""")

## Fetch latest train data

In [None]:
def fetch_latest_etas(train_line: str):
    params = {'rt':train_line,
         'key': CTA_API_KEY,
         'outputType':'JSON'}
    
    response = requests.get("https://lapi.transitchicago.com/api/1.0/ttpositions.aspx", params=params)
    
    return response.json()

In [None]:
fetch_latest_etas('red')

## Create Kafka topic

In [None]:
admin_client = KafkaAdminClient(
    bootstrap_servers="kafka:29092",  # Replace with your Kafka broker address
#     client_id="my-admin-client"
)

topic_name = "train-etas"
num_partitions = 1
replication_factor = 1

new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

admin_client.create_topics([new_topic])

## Set up Kafka producer

In [None]:
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda v: json.dumps(v).encode('utf-8'))

## Set up Kafka consumer to print messages in topic

In [None]:
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('train-etas',
                         bootstrap_servers=['localhost:9092'])

for message in consumer:
    print(message.value.decode('utf-8'))

## PyFlink 

In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json

env = StreamExecutionEnvironment.get_execution_environment()

kafka_consumer = FlinkKafkaConsumer(
    topics=['train-etas'],
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092'}
)

data_stream = env.add_source(kafka_consumer)

data_stream.print()

## Poll api and send

In [None]:
def poll_api_and_send(train_line):
    while True:
        latest_data = fetch_latest_etas(train_line)
        producer.send('input-topic', latest_data)
        print('sent latest train eta data')
        
        time.sleep(30)

In [None]:
poll_api_and_send('red')