In [24]:
%reload_ext autoreload
%autoreload 2
from pyspark.sql.functions import col, date_format
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
import pyspark.sql.types as t
from workers.Aliases import *
from workers import FareWorker, TripWorker
import config

In [5]:
def create_df_example(spark: SparkSession) -> DataFrame:
    data = [("Alice", 20), ("Bob", 30), ("Charlie", 50)]
    schema = t.StructType([t.StructField("Name", t.StringType(), True),
                           t.StructField("Age", t.IntegerType(), True)])
    return spark.createDataFrame(data, schema)

In [101]:
spark = config.init_spark()
spark

In [99]:
tripWorker = TripWorker.TripWorker(spark)

In [87]:
tripWorker.df.show(n=10)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|        -73.98984|        40.75117|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...| 

In [9]:
tripWorker.df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)


In [10]:
tripWorker.schema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- dropoff_latitude: float (nullable = true)


In [18]:
num_cols = [f.name for f in tripWorker.df.schema.fields if isinstance(f.dataType, t.NumericType)]
tripWorker.df.describe(num_cols).show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|summary|         rate_code|   passenger_count|trip_time_in_secs|     trip_distance|  pickup_longitude|   pickup_latitude| dropoff_longitude| dropoff_latitude|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+
|  count|         171899205|         171899205|        171899205|         171899205|         171899205|         171899205|         171899205|        171899205|
|   mean|1.0270787523421065|1.7116061647870913|757.7813952775407|2.9229047508589625|-72.55375877214989|39.808676497132545|-72.55916906300426|39.81256532636291|
| stddev|0.3005756867875085|1.3771786361954093|560.5441541100786| 20.95747950872376|10.967169280209191| 8.655816513663162|10.938848778547408|8.685174083633768|
|    min|                 0|            

In [27]:
tripWorker.df.select(rate_code).distinct().show(1000)

+---------+
|rate_code|
+---------+
|        1|
|        6|
|        3|
|        5|
|        4|
|        8|
|        2|
|        0|
|      210|
|       28|
|        7|
|        9|
|       65|
|      128|
|       10|
|       13|
|       15|
|       79|
|      221|
|       17|
|      208|
|      206|
|       77|
|      239|
|      200|
|       16|
+---------+


In [28]:
tripWorker.df.select(passenger_count).distinct().show(1000)

+---------------+
|passenger_count|
+---------------+
|              1|
|              6|
|              3|
|              5|
|              4|
|              2|
|              0|
|            208|
|              9|
|            255|
|              7|
|              8|
|            129|
+---------------+


In [28]:
(tripWorker.df
 .select((col(trip_time_in_secs) / 60).cast("int").alias("trip_time_in_min"))
 .filter(col('trip_time_in_min') > 10)
 .distinct()
 .orderBy('trip_time_in_min')
 .show(10000))

+----------------+
|trip_time_in_min|
+----------------+
|              11|
|              12|
|              13|
|              14|
|              15|
|              16|
|              17|
|              18|
|              19|
|              20|
|              21|
|              22|
|              23|
|              24|
|              25|
|              26|
|              27|
|              28|
|              29|
|              30|
|              31|
|              32|
|              33|
|              34|
|              35|
|              36|
|              37|
|              38|
|              39|
|              40|
|              41|
|              42|
|              43|
|              44|
|              45|
|              46|
|              47|
|              48|
|              49|
|              50|
|              51|
|              52|
|              53|
|              54|
|              55|
|              56|
|              57|
|              58|
|              59|
|           

In [22]:
(tripWorker.df
 .select(trip_distance, (col(trip_distance) / 1000.0).alias(trip_distance + ' in km'))
 .filter(col(trip_distance + ' in km') > 1)
 .distinct().orderBy(trip_distance + ' in km').show(10000))

+-------------+-------------------+
|trip_distance|trip_distance in km|
+-------------+-------------------+
|       1000.5|             1.0005|
|       1005.0|              1.005|
|       1040.0|               1.04|
|       1047.4| 1.0474000244140624|
|       1232.0|              1.232|
|       1300.0|                1.3|
|       1300.5|             1.3005|
|       1313.1| 1.3130999755859376|
|       1320.6| 1.3205999755859374|
|       1350.0|               1.35|
|       1440.0|               1.44|
|       1483.1| 1.4830999755859375|
|       1485.2|  1.485199951171875|
|       1500.0|                1.5|
|       1566.6| 1.5665999755859374|
|       1574.6| 1.5745999755859375|
|       1600.0|                1.6|
|       1650.0|               1.65|
|       1800.0|                1.8|
|       2001.9| 2.0019000244140623|
|       2139.5|             2.1395|
|       2500.5|             2.5005|
|       2800.5|             2.8005|
|       2840.0|               2.84|
|       3008.3|  3.008300048

In [51]:
fareWorker = FareWorker.FareWorker(spark)

In [30]:
fareWorker.df.show()

+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|vendor_id|    pickup_datetime|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+---------+-------------------+------------+-----------+---------+-------+----------+------------+------------+
|F1C63842BC47D1C30...|9D0A6E44AF6C07CD7...|      VTS|2013-01-13 04:14:00|         CRD|       32.0|      0.5|    0.5|       6.5|         6.5|        46.0|
|ED9B6E969C35E1631...|1067C6EE7965AACBC...|      VTS|2013-01-13 03:46:00|         CRD|      107.5|      0.5|    0.5|      21.6|         4.8|       134.9|
|203F59092282F76DA...|711947488A628488F...|      VTS|2013-01-13 04:22:00|         CRD|       33.0|      0.5|    0.5|       6.7|         2.2|        42.9|
|AF654FCA5D10F5E1A...|47EB5C6F23BFD50C4...|      VTS|2013-01-13 04:46:00|   

In [23]:
num_cols = [f.name for f in fareWorker.df.schema.fields if isinstance(f.dataType, t.NumericType)]
fareWorker.df.describe(num_cols).show()

+-------+------------------+------------------+-------+------------------+-----------------+------------------+
|summary|       fare_amount|         surcharge|mta_tax|        tip_amount|     tolls_amount|      total_amount|
+-------+------------------+------------------+-------+------------------+-----------------+------------------+
|  count|           1450887|           1450887|1450887|           1450887|          1450887|           1450887|
|   mean|29.863166580163966|0.6842392894832685|    0.5| 6.770913894891303|5.291119777738701|43.109572573213896|
| stddev| 9.264048374108768|0.2422062176420006|    0.0|3.1444275597553943|1.036669278240577|11.643178495675743|
|    min|              2.42|              0.02|    0.5|              0.01|             0.01|              3.86|
|    max|             342.5|              10.5|    0.5|             818.8|            110.0|            855.63|
+-------+------------------+------------------+-------+------------------+-----------------+------------

In [11]:
avg_total_amount_by_payment_type_df = fareWorker.avg_total_amount_by_payment_type()
avg_total_amount_by_payment_type_df.show()
config.write_to_csv(avg_total_amount_by_payment_type_df, 'avg_total_amount_by_payment_type')

+------------+--------------------------------+
|payment_type|avg_total_amount_by_payment_type|
+------------+--------------------------------+
|         UNK|               41.46988988587174|
|         CRD|               43.10881165649835|
|         DIS|               47.32923111548791|
|         CSH|              57.216788711919115|
|         NOC|                67.1824008178711|
+------------+--------------------------------+


In [55]:
avg_fare_amount_per_km_by_rate_df = tripWorker.avg_fare_amount_per_km_by_rate_code(fareWorker.df)
avg_fare_amount_per_km_by_rate_df.show()
#config.write_to_csv(avg_total_amount_per_km_by_rate_df, 'avg_total_amount_per_km_by_rate')

+---------+----------------------+
|rate_code|avg_fare_amount_per_km|
+---------+----------------------+
|        0|    3089.0213696172023|
|        1|     3059.222570734489|
|        2|    2678.5714192572077|
|        3|     4155.872212261154|
|        4|    3554.3017169639616|
|        5|    2900.4227181682363|
|        6|    3666.6666666666665|
|        7|    1519.3370245798492|
|      210|     4222.222222222223|
+---------+----------------------+


In [107]:
min_max_cache_tip_df = fareWorker.min_max_cache_tip()
min_max_cache_tip_df.show()
config.write_to_csv(min_max_cache_tip_df, 'min_max_cache_tip')

+---------------+---------------+
|min(tip_amount)|max(tip_amount)|
+---------------+---------------+
|           0.01|           50.0|
+---------------+---------------+


In [46]:
vendor_month_revenue_df = fareWorker.vendor_month_revenue()
vendor_month_revenue_df.show(20)
config.write_to_csv(vendor_month_revenue_df, 'vendor_month_revenue')

+--------------------+--------------------+---------+-----+------------------+
|           medallion|        hack_license|vendor_id|month|     total_revenue|
+--------------------+--------------------+---------+-----+------------------+
|A6E46CCD4542CC4A0...|8DF90199AA2703679...|      CMT|    1|2220228.5545344353|
|6C7810EA10F4A6671...|47D9A7E7235DDDD5E...|      CMT|    1|2220228.5545344353|
|546D784C4A13459CC...|4E385978D48B76BAD...|      CMT|    1|2220228.5545344353|
|E13D1812CB788966D...|D004B04F9BF03BD36...|      CMT|    1|2220228.5545344353|
|8B92CB499DFC80FC3...|AB3999D3373DE336D...|      CMT|    1|2220228.5545344353|
|3D423211B86371C07...|74D0F543A2B86D7ED...|      CMT|    1|2220228.5545344353|
|84A517403D0273650...|CDA65A99A4DC0D862...|      CMT|    1|2220228.5545344353|
|6C1132EF70BC0A7DB...|5DE2A4E18BDA9A76C...|      CMT|    1|2220228.5545344353|
|B6AA3B5DE51F00063...|DDB614CBFFB2F7FF3...|      CMT|    1|2220228.5545344353|
|184E74BE36BCEB1FD...|6E323D0D5405473A4...|      CMT

In [63]:
trip_and_average_trip_over_100_dollars_df = tripWorker.trip_and_average_trip_over_100_dollars(fareWorker.df)
trip_and_average_trip_over_100_dollars_df.show()
config.write_to_csv(trip_and_average_trip_over_100_dollars_df, 'trip_and_average_trip_over_100_dollars')


+--------------------+--------------------+---------+-----------------+--------------------------+
|           medallion|        hack_license|vendor_id|trip_time_in_secs|average_trip_time_over_100|
+--------------------+--------------------+---------+-----------------+--------------------------+
|4A510B57ADC591D08...|000B8D660A329BBDB...|      CMT|             3389|                    3389.0|
|DA707A2C66E8769A0...|00117D7CCD47D125E...|      CMT|             2379|                    2379.0|
|FC51A43508446C316...|001916B0A5713FF05...|      VTS|             2280|                    2280.0|
|DC4867C1F2D5D2986...|00360896E0AB8AEAE...|      VTS|             2220|                    2220.0|
|E51AFB4DE27D61EAA...|003C68DFE1EBE1205...|      CMT|             1907|                    1907.0|
|E51AFB4DE27D61EAA...|003C68DFE1EBE1205...|      CMT|             3237|                    2572.0|
|E51AFB4DE27D61EAA...|003C68DFE1EBE1205...|      CMT|             3674|        2939.3333333333335|
|8F7D73196

In [69]:
average_fare_amount_by_rate_in_summer_df = tripWorker.average_fare_amount_by_rate_in_summer(fareWorker.df)
average_fare_amount_by_rate_in_summer_df.show()
config.write_to_csv(average_fare_amount_by_rate_in_summer_df, 'average_fare_amount_by_rate_in_summer')


+---------+-----------------------+
|rate_code|avg_fare_amount_by_type|
+---------+-----------------------+
|        5|                   17.6|
|        1|      29.78875793346561|
|        0|     30.232558139534884|
|        2|     36.166666666666664|
|      210|                   38.0|
|        3|                   74.5|
|        4|      78.75045863533488|
+---------+-----------------------+


In [105]:
trip_distance_by_passenger_count_dependency_df = tripWorker.trip_distance_by_passenger_count_dependency()
trip_distance_by_passenger_count_dependency_df.show()

+---------------+-------------------------------------------+
|passenger_count|trip_distance_by_passenger_count_dependency|
+---------------+-------------------------------------------+
|              1|                          2.864331676228945|
|              6|                         2.9664856579521315|
|              3|                          2.995664093904258|
|              5|                          3.005980546129041|
|              4|                         3.0704309022464362|
|              2|                         3.1326836085062437|
|              9|                           3.40529416753527|
|              7|                           4.07684211040798|
|              8|                          5.260000059237847|
+---------------+-------------------------------------------+


In [106]:
config.write_to_csv(trip_distance_by_passenger_count_dependency_df, 'trip_distance_by_passenger_count_dependency')