In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import hour
from pyspark.sql import functions as F


In [2]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("Taxi Trips Processing") \
    .getOrCreate()

# Load the large dataset 

taxi_trips = spark.read.csv("taxi_trips.csv", header=True, inferSchema=True)

# Show a sample of the data
taxi_trips.show(5)

24/10/18 13:23:39 WARN Utils: Your hostname, Autochek-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.96.65 instead (on interface en0)
24/10/18 13:23:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/18 13:23:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/10/18 13:23:41 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/10/18 13:23:56 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

+--------------------+--------------------+--------------------+------------+----------+
|          unique_key|             taxi_id|trip_start_timestamp|trip_seconds|trip_miles|
+--------------------+--------------------+--------------------+------------+----------+
|aee47615b43b47807...|d34bb905b4c5c5e6e...| 2020-07-07 19:15:00|          56|       0.0|
|62c522d608b792d3f...|841f99fdfdf5cedb4...| 2020-07-07 05:15:00|         562|      5.06|
|4e6c3a495af8990e9...|9361b5b6a56dbfbc2...| 2020-07-07 10:45:00|         415|      1.39|
|e04451f3e6fb44d75...|3ff6ae822a41ebadf...| 2020-07-07 10:45:00|        2368|      9.82|
|53f1285b865df1cf7...|78d45b51ff03d70f0...| 2020-07-07 13:30:00|        2940|       0.0|
+--------------------+--------------------+--------------------+------------+----------+
only showing top 5 rows



In [4]:
#Length of dataframe/Total number of trips

total_trips = taxi_trips.count()
print("Total number of trips:", total_trips)



Total number of trips: 21160773


                                                                                

In [8]:
#Total miles driven

total_miles = taxi_trips.agg({"trip_miles": "sum"}).collect()[0][0]
print("Total miles driven:", total_miles)



Total miles driven: 99230999.98000765


                                                                                

In [10]:
total_miles_ = taxi_trips.select(sum("trip_miles")).collect()[0][0]
total_miles_ 

                                                                                

99230999.98000765

In [12]:

total_miles_per_taxi = taxi_trips.groupBy("taxi_id").agg(
    F.sum("trip_miles").alias("total_miles")
)
print("Total miles per taxi")

total_miles_per_taxi.show(10)

Total miles per taxi




+--------------------+------------------+
|             taxi_id|       total_miles|
+--------------------+------------------+
|b83ba3da1a4bc2a68...| 43232.10000000001|
|7da8dfbf5db7bed94...| 54406.87999999999|
|b5bf5d282fa4191c6...|               0.0|
|195a4d0fc8a42120e...|339.20000000000033|
|d2c2d4128d6597a3b...| 48855.40999999998|
|f6d1b6c930d62f6d8...|          27515.33|
|a8aee50b5b0787156...| 34502.00000000001|
|bb7eb49d01457ba3d...| 7871.700000000001|
|79a7b43709940cecf...|10930.400000000003|
|bdba06f86d8822d4d...| 8643.960000000001|
+--------------------+------------------+
only showing top 10 rows



                                                                                

In [15]:

miles_duration_by_hour = taxi_trips.withColumn("trip_hour", hour("trip_start_timestamp")).groupBy("trip_hour").agg(
    F.sum("trip_miles").alias("total_miles"),
    F.avg("trip_seconds").alias("avg_trip_duration")
)

print("Miles duration per hour")
miles_duration_by_hour.show(30)

Miles duration per hour




+---------+------------------+------------------+
|trip_hour|       total_miles| avg_trip_duration|
+---------+------------------+------------------+
|       12| 5401278.970000022|  920.660089795414|
|       22|4719400.8900000015|1001.1222857578178|
|        1| 2305076.119999995| 933.0665880953133|
|       13| 6009550.360000024|  965.113814081403|
|       16| 6582993.340000044|1146.6154760037414|
|        6|1404563.2699999989|1012.8341476452665|
|        3| 878957.1300000008| 758.9463622278126|
|       20|  6160129.02000003|1027.8626126196364|
|        5| 835480.2600000002| 814.6572536850272|
|       19| 6134234.240000029|1080.5957855575357|
|       15| 6492258.060000039|1072.3100385616226|
|        9|3873350.8299999926| 924.2295458349023|
|       17| 6403516.420000037|1140.8044367157993|
|        4| 667062.7200000011| 716.7707961482294|
|        8|2977375.5999999996| 956.7090765195665|
|       23| 3700058.209999991| 945.2392627050642|
|        7| 2099158.789999998|1016.4470115076576|


                                                                                