## Taxi Queries on a Structured Stream

We have 106GB of NYC Yellow Taxi data that has been cleaned and normalized based on the [published schema](https://www1.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_yellow.pdf)


In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
import os

In [2]:
spark = SparkSession.builder.appName("readTaxi").getOrCreate()
spark

In [6]:
taxi_file_dir=os.path.expanduser("~/data/taxi_data_cleaned_18_standard/")
bad_records_path=os.path.expanduser("~/data/badrecords")
taxi_output_dir=os.path.expanduser("~/data/query_output")
checkpoint_loc=os.path.expanduser("~/data/checkpoints")

### Define schema ahead of time for structured streaming

In [4]:
taxi_orig_schema = (
    StructType()
    .add("VendorID", IntegerType())
    .add("tpep_pickup_datetime", TimestampType())
    .add("tpep_dropoff_datetime", TimestampType())
    .add("passenger_count", IntegerType())
    .add("trip_distance", DoubleType())
    .add("RatecodeID", IntegerType())
    .add("store_and_fwd_flag", StringType())
    .add("PULocationID", IntegerType())
    .add("DOLocationID", IntegerType())
    .add("payment_type", IntegerType())
    .add("fare_amount", DoubleType())
    .add("extra", DoubleType())
    .add("mta_tax", DoubleType())
    .add("tip_amount", DoubleType())
    .add("tolls_amount", DoubleType())
    .add("improvement_surcharge", DoubleType())
    .add("total_amount", DoubleType())
    .add("congestion_surcharge", DoubleType())
)

### Define Stream Source

Also get the SQL table reference for the dataframe

In [7]:
taxi_input_df = (
    spark
    .readStream
    .schema(taxi_orig_schema)
    .option("header", "false")
    .option("maxFilesPerTrigger", 1)
    .option("badRecordsPath", bad_records_path)
    .csv(taxi_file_dir) #the source
)

#Create SQL table out of it
taxi_input_df.createOrReplaceTempView('nyyellowtaxi')

### Queries

```sql
1. select passenger_count, year(tpep_pickup_datetime) year from nyyellowtaxi
2. select passenger_count, avg(total_amount) as avg_amount from nyyellowtaxi group by passenger_count
3. select passenger_count, year(tpep_pickup_datetime) year, count(*) total 
   from nyyellowtaxi group by passenger_count, year
4. select passenger_count, year(tpep_pickup_datetime) as year, 
    cast(trip_distance as int) as distance, 
    count(*) as total 
    from nyyellowtaxi
    group by passenger_count, year, distance
    order by year, total desc
```


In [9]:
queries = ["select passenger_count, year(tpep_pickup_datetime) year from nyyellowtaxi",
          "select passenger_count, avg(total_amount) as avg_amount from nyyellowtaxi group by passenger_count",
           "select passenger_count, year(tpep_pickup_datetime) year, count(*) total FROM $tableName group by passenger_count, year",
           """select passenger_count, year(tpep_pickup_datetime) as year, 
        cast(trip_distance as int) as distance, 
        count(*) as total 
        from $tableName
        group by passenger_count, year, distance
        order by year, total desMc"""
          ]

query_names = ["q{}".format(i) for i in range(1, len(queries))]
out_dirs = [os.path.join(taxi_output_dir, n) for n in query_names]
checkpoint_locs = [os.path.join(checkpoint_loc, n) for n in query_names]
name_query_outdir = list(zip(query_names, queries, out_dirs, checkpoint_locs))

In [10]:
def start_query(query_desc):
  query_name, query_str, output_dir, checkpoint_loc = query_desc
  q = (
    spark.sql(query_str)
    .writeStream
    .outputMode("append")
    .format("csv")
    .option("path", output_dir)
    .option("startingOffsets", "earliest")
    .option("checkpointLocation", checkpoint_loc)
    .queryName(query_name)
    .trigger(processingTime='1 seconds')    
    .start()
  )
  return q

In [11]:
q1 = start_query(name_query_outdir[0])

In [12]:
q1.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

In [13]:
q1.stop()