In [15]:
import requests
import json
import xml.etree.ElementTree as ElementTree
import xmltodict
import sqlite3
import mysql.connector
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
import time

from datetime import datetime, timedelta

from config import CTA_API_KEY

## Arrivals API

In [None]:
params = {'mapid': 40630,
         'stpid': 30122,
          'max': 3,
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("http://lapi.transitchicago.com/api/1.0/ttarrivals.aspx", params=params)

In [None]:
response.json()

In [None]:
root = ElementTree.fromstring(response.text)

In [None]:
# child = 
root.getchildren()

In [None]:
child.text

In [None]:
def xml_to_tree(node):
    curr = {}
    eta_num = 1
    for child in node.getchildren():
        child_tag = child.tag
        if child.getchildren():
            if child_tag == 'eta':
                child_tag += f'_{eta_num}'
                eta_num += 1
            curr[child_tag] = xml_to_tree(child)
        else:
            curr[child_tag] = child.text
    
    return curr

In [None]:
response.text

In [None]:
print(xml_to_tree(root))

In [None]:
data_dict = xmltodict.parse(response.content)

## Follow this train API

In [None]:
params = {'runnumber':831,
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("https://lapi.transitchicago.com/api/1.0/ttfollow.aspx", params=params)

In [None]:
response.json()

## Locations API

In [None]:
params = {'rt':'red',
         'key': CTA_API_KEY,
         'outputType':'JSON'}

In [None]:
response = requests.get("https://lapi.transitchicago.com/api/1.0/ttpositions.aspx", params=params)

In [None]:
response.json()

## Database for storing train run delays per stop

### Sqlite

In [None]:
conn = sqlite3.connect("cta_trains.db")
cur = conn.cursor()

In [None]:
# Create train_delays table
cur.execute("""
create table train_delays
(train_line string, run_number int, day date,
previous_station string, next_station string, 
delay_in_minutes float)
""")

### MySQL

In [4]:
mydb = mysql.connector.connect(
  host="localhost",
  port=3306,
  user="myuser",
  password="mypassword",
  database="mydatabase"
)

In [5]:
mycursor = mydb.cursor()

mycursor.execute("SHOW DATABASES")

for x in mycursor:
  print(x)

('information_schema',)
('mydatabase',)
('performance_schema',)


### Train delays table

In [None]:
mycursor.execute("""
create table train_delays
(train_line VARCHAR(10), run_number VARCHAR(10), day date,
previous_station VARCHAR(30), next_station VARCHAR(30), end_station VARCHAR(30),
delay_in_minutes float, original_eta timestamp, latest_eta timestamp)
""")

In [None]:
# mycursor.execute("""
# alter table train_delays
# add end_station VARCHAR(30)
# """)

In [None]:
mycursor.execute("""
select 
*
from train_delays
order by day desc, delay_in_minutes desc
""")
# train_line, run_number, previous_station,
# next_station,
# delay_in_minutes, date_format(original_eta, '%m/%d/%Y %H:%i:%S'),
# date_format(latest_eta, '%m/%d/%Y %H:%i:%S')

result = mycursor.fetchall()

for x in result:
    print(x)

In [None]:
# mycursor.execute("""drop table train_delays""")

### Notifications table

In [None]:
mycursor.execute("""
create table delay_notifications
(notification_id int NOT NULL AUTO_INCREMENT, train_line VARCHAR(10), 
station VARCHAR(30), end_station VARCHAR(30), num_delays int, alert_time timestamp,
notification_sent boolean, PRIMARY KEY(notification_id))
""")

In [None]:
# mycursor.execute("""drop table delay_notifications""")

### Subscriptions table

In [None]:
mycursor.execute("""
create table station_subscriptions
(subscription_id int NOT NULL AUTO_INCREMENT, user_id int, 
train_line VARCHAR(10), station VARCHAR(30),
PRIMARY KEY (subscription_id))
""")

In [None]:
# mycursor.execute("""drop table station_subscriptions""")

### Add subscriptions

In [None]:
# Red Line stops
red_line_stops = ["Howard", "Jarvis", "Morse", 
    "Loyola", "Granville", "Thorndale", "Bryn Mawr", "Argyle", 
    "Wilson", "Sheridan", "Addison", "Belmont", "Fullerton", 
    "North/Clybourn", "Clark/Division", "Chicago", "Grand", 
    "Lake", "Monroe", "Jackson", "Harrison", 
    "Roosevelt", "Cermak-Chinatown", "Sox-35th", "47th", 
    "Garfield", "63rd", "69th", "79th", "87th", "95th/Dan Ryan"]

In [None]:
for stop in red_line_stops:
    mycursor.execute(f"""
    insert into station_subscriptions values
    (null, 1, 'Red Line', '{stop}')
    """)

    mydb.commit()  # Commit the changes

In [None]:
mycursor.execute("""
select *
from station_subscriptions
""")

result = mycursor.fetchall()

for x in result:
    print(x)

### Users table

In [None]:
mycursor.execute("""
create table users
(user_id int NOT NULL AUTO_INCREMENT, first_name VARCHAR(30), last_name VARCHAR(30),
email VARCHAR(30), PRIMARY KEY(user_id))
""")

### Add users

In [None]:
mycursor.execute("""
insert into users values
(null, 'Jesse', 'Moderwell', 'jesse.moderwell@gmail.com')
""")

mydb.commit()  # Commit the changes

In [4]:
mycursor.execute("""
select *
from users
""")

result = mycursor.fetchall()

for x in result:
    print(x)

(1, 'Jesse', 'Moderwell', 'jesse.moderwell@gmail.com')


## Check train delays for new notifications to send

In [None]:
# Get delays by train line/ station
mycursor.execute("""
select train_line, next_station, count(*) as delays
from train_delays
where delay_in_minutes >= 5
group by train_line, next_station
""")

result = mycursor.fetchall()

for x in result:
    print(x)

In [None]:
curr_time = datetime.now()

In [None]:
curr_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
print(curr_time)

In [None]:
curr_time = datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S")
for train_line, station, num_delays in result:
    if num_delays >= 3:
        query = f"""
        insert into delay_notifications values (null, '{train_line}', '{station}',
        {num_delays}, '{curr_time}', False);
        """
        print(query)
        mycursor.execute(query)
        

In [None]:
mycursor.execute("""
select notification_id, train_line, station, num_delays, date_format(alert_time, '%m/%d/%Y %H:%i'),
notification_sent
from delay_notifications
""")

result = mycursor.fetchall()

for x in result:
    print(x)

### Check notifications and send to subscribers

In [5]:
since_time = datetime.now() - timedelta(hours=3)

mycursor.execute("""
select *
from delay_notifications
where notification_sent = False
""")

result = mycursor.fetchall()

print(result)

curr_notifications = [[notification[1], notification[2], notification[3], notification[4]] for notification in result]

[(16, 'Red Line', 'Morse', '95th/Dan Ryan', 2, datetime.datetime(2024, 11, 19, 17, 50, 21), 0), (17, 'Red Line', 'Roosevelt', 'Howard', 2, datetime.datetime(2024, 11, 19, 21, 4, 2), 0)]


In [19]:
since_time = (datetime.now() - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S")

query = f"""
select n.notification_id, s.train_line, s.station, n.end_station, u.email, n.num_delays
from station_subscriptions s
inner join delay_notifications n on (s.train_line, s.station) = (n.train_line, n.station)
inner join users u on s.user_id = u.user_id
where n.notification_sent = False
and n.alert_time >= '{since_time}'
"""
print(query)

mycursor.execute(query)

subscriber_notifications = mycursor.fetchall()



select n.notification_id, s.train_line, s.station, n.end_station, u.email, n.num_delays
from station_subscriptions s
inner join delay_notifications n on (s.train_line, s.station) = (n.train_line, n.station)
inner join users u on s.user_id = u.user_id
where n.notification_sent = False
and n.alert_time >= '2024-11-21 12:07:56'



In [18]:
subscriber_notifications


[]

### Mark notification as sent

In [None]:
# mycursor.execute(f"""
# alter table delay_notifications
# set notification_sent = True
# where notification_id = '{notification[0]}'
# """)

# mydb.commit()


### Send notification email to subscribers

In [11]:
from config import SERVICE_EMAIL, SERVICE_EMAIL_PASSWORD

import smtplib, ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

for notification in subscriber_notifications:

    sender_email = SERVICE_EMAIL
    receiver_email = notification[4]
    password = SERVICE_EMAIL_PASSWORD

    message = MIMEMultipart("alternative")
    message["Subject"] = f"{notification[1]} train(s) at {notification[2]} are delayed toward {notification[3]}"
    message["From"] = SERVICE_EMAIL
    message["To"] = SERVICE_EMAIL

    # Create the plain-text and HTML version of your message
    text = f"""
    There have been {notification[5]} delayed {notification[1]} train(s) at {notification[2]} toward {notification[3]}
    in the last hour
    """

    # Turn these into plain/html MIMEText objects
    part1 = MIMEText(text, "plain")
    # part2 = MIMEText(html, "html")

    # Add HTML/plain-text parts to MIMEMultipart message
    # The email client will try to render the last part first
    message.attach(part1)
    # message.attach(part2)

    # Create secure connection with server and send email
    context = ssl.create_default_context()
    with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
        server.login(sender_email, password)
        server.sendmail(
            sender_email, receiver_email, message.as_string()
        )
        
    mycursor.execute(f"""
    update delay_notifications
    set notification_sent = True
    where notification_id = {notification[0]}
    """)

    mydb.commit()

## Fetch latest train data

In [None]:
def fetch_latest_etas(train_line: str):
    params = {'rt':train_line,
         'key': CTA_API_KEY,
         'outputType':'JSON'}
    
    response = requests.get("https://lapi.transitchicago.com/api/1.0/ttpositions.aspx", params=params)
    
    return response.json()

In [None]:
fetch_latest_etas('red')

## Create Kafka topic

In [None]:
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092",  # Replace with your Kafka broker address
#     client_id="my-admin-client"
)

topic_name = "train-etas"
num_partitions = 1
replication_factor = 1

new_topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)

admin_client.create_topics([new_topic])

## Delete Kafka Topic

In [None]:
admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092",  # Replace with your Kafka broker address
#     client_id="my-admin-client"
)

topic_name = 'train-etas'

try:
    admin_client.delete_topics([topic_name])
    print(f"Topic '{topic_name}' deleted successfully.")
except Exception as e:
    print(f"Error deleting topic '{topic_name}': {e}")

## Set up Kafka producer

In [None]:
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda v: json.dumps(v).encode('utf-8'))

## Set up Kafka consumer to print messages in topic

In [None]:
from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('train-etas',
                         bootstrap_servers=['localhost:9092'])

for message in consumer:
    print(message.value.decode('utf-8'))

## PyFlink 

In [None]:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
import json

env = StreamExecutionEnvironment.get_execution_environment()

kafka_consumer = FlinkKafkaConsumer(
    topics=['train-etas'],
    deserialization_schema=SimpleStringSchema(),
    properties={'bootstrap.servers': 'localhost:9092'}
)

data_stream = env.add_source(kafka_consumer)

data_stream.print()

## Poll api and send

In [None]:
def poll_api_and_send(train_line):
    while True:
        latest_data = fetch_latest_etas(train_line)
        producer.send('train-etas', latest_data)
        print('sent latest train eta data')
        
        time.sleep(30)

In [None]:
poll_api_and_send('red')