In [1]:
import pyspark.sql.functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame

In [2]:
spark = (SparkSession.builder
                     .appName('arrival_ml')
                     .config('parentProject', 'cta-tracking')
                     .config('spark.jars', 'https://storage.googleapis.com/spark-lib/bigquery/spark-3.4-bigquery-0.34.0.jar')
                     .config('credentialsFile', 'cta-tracking-6135a30a5c0c.json')
                     .getOrCreate())

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

23/12/12 04:40:07 WARN Utils: Your hostname, codespaces-05ff3a resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
23/12/12 04:40:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/12 04:40:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


DataFrame[key: string, value: string]

In [3]:
def load_data(file_path: str):
    'read csv for development, may change to BQ in the future'
    _schema = StructType([
        StructField("destNm", StringType(), nullable=True),
        StructField("nextStaNm", StringType(), nullable=True),
        StructField("arrT", TimestampType(), nullable=True),
        StructField("isApp", IntegerType(), nullable=True),
        StructField("isDly", IntegerType(), nullable=True),
    ])
    return (spark.read
            .option('timestampFormat', 'MM/dd/yyyy HH:mm:ss')
            .schema(_schema)
            .csv(file_path, header=True)
            )

In [4]:
def load_bq_table(spark):
      '''load last 2 months of data'''
      spark.conf.set("viewsEnabled","true")
      spark.conf.set("materializationDataset","line_stops")

      sql_statement = '''
            SELECT destNM, nextStaNm, arrT, isApp, isDly
            FROM `cta-tracking.line_stops.blue`
            WHERE resp_time >= DATE_SUB(CURRENT_DATE, INTERVAL 2 MONTH)
      '''
      df = (spark.read
            .format('bigquery')
            .option('query', sql_statement)
            .load()
      )
      df = df.withColumn('arrT', func.col('arrT').cast(TimestampType()))
      return df

In [5]:
df = load_bq_table(spark=spark)
df.printSchema()

root
 |-- destNM: string (nullable = true)
 |-- nextStaNm: string (nullable = true)
 |-- arrT: timestamp (nullable = true)
 |-- isApp: string (nullable = true)
 |-- isDly: string (nullable = true)



In [8]:
def arrival_morning(raw_data: DataFrame,
                    stop: str = 'Division')-> DataFrame:
    '''split raw data into morning and afternoon section
        param:
            raw_data: initial data from data load
            stop: name of the stop
        return:
            a splitted dataframe of interest
    '''
    df_approach = raw_data.filter(
                    # spark is verbose
                    # morning
                    (func.hour(func.col('arrT'))<10) &
                    (func.col('isApp')==1) &
                    # train is approaching Division
                    (func.col('nextStaNm')==stop) &
                    # direction is towards to forest park or UIC
                    ((func.col('destNm')=='Forest Park') | (func.col('destNM')=='UIC-Halsted'))
                )
    df_approach = df_approach.sort('arrT')

    return df_approach

df_morning = arrival_morning(raw_data=df)

In [9]:
def time_between_arrival(arrival: DataFrame
                         )-> DataFrame:
    windowSpec = Window().orderBy("arrT").partitionBy(func.date_format("arrT", "yyyy-MM-dd"))
    df_arrival = arrival.withColumn('next_arrT', func.lead('arrT').over(windowSpec))
    
    df_arrival = df_arrival.withColumn('arrival_diff', 
                                       func.when(func.col('next_arrT').isNotNull(),
                                                (func.col("next_arrT").cast("long") - func.col("arrT").cast("long"))
                                                ).otherwise(None))
    
    df_arrival = df_arrival.dropna(subset='arrival_diff')
    
    return df_arrival

time_between_arrival(arrival=df_morning).show()

+-----------+---------+-------------------+-----+-----+-------------------+------------+
|     destNM|nextStaNm|               arrT|isApp|isDly|          next_arrT|arrival_diff|
+-----------+---------+-------------------+-----+-----+-------------------+------------+
|Forest Park| Division|2023-10-12 08:12:54|    1|    0|2023-10-12 08:21:13|         499|
|Forest Park| Division|2023-10-12 08:21:13|    1|    0|2023-10-12 08:25:13|         240|
|Forest Park| Division|2023-10-12 08:25:13|    1|    0|2023-10-12 08:26:54|         101|
|Forest Park| Division|2023-10-12 08:26:54|    1|    0|2023-10-12 08:47:16|        1222|
|Forest Park| Division|2023-10-12 08:47:16|    1|    0|2023-10-12 09:09:16|        1320|
|Forest Park| Division|2023-10-12 09:09:16|    1|    0|2023-10-12 09:11:16|         120|
|Forest Park| Division|2023-10-12 09:11:16|    1|    0|2023-10-12 09:31:12|        1196|
|Forest Park| Division|2023-10-12 09:31:12|    1|    0|2023-10-12 09:37:14|         362|
|Forest Park| Divisio