In [1]:
#spark setup
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NYC Taxi Big Data Task1") \
    .getOrCreate()


In [2]:
#load Dataset
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/content/yellow_tripdata_2015-01.csv")
df.printSchema()
df.count()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)



141355

In [5]:
#Remove Null Values
from pyspark.sql.functions import col

df_clean = df.dropna()
df_clean.count()

141355

In [4]:
#Remove Invalid Records
df_clean = df_clean.filter(col("passenger_count") > 0)
df_clean = df_clean.filter(col("trip_distance") > 0)
df_clean = df_clean.filter(col("fare_amount") > 0)
df_clean = df_clean.filter(col("total_amount") > 0)

In [6]:
#Datetime Conversion
from pyspark.sql.functions import to_timestamp

df_clean = df_clean.withColumn(
    "tpep_pickup_datetime",
    to_timestamp("tpep_pickup_datetime")
).withColumn(
    "tpep_dropoff_datetime",
    to_timestamp("tpep_dropoff_datetime")
)


In [7]:
#Trip Duration Feature
from pyspark.sql.functions import unix_timestamp

df_clean = df_clean.withColumn(
    "trip_duration_minutes",
    (unix_timestamp("tpep_dropoff_datetime") -
     unix_timestamp("tpep_pickup_datetime")) / 60
)

df_clean = df_clean.filter(col("trip_duration_minutes") > 0)



In [8]:
#Cache
df_clean.cache()
df_clean.count()


141209

In [9]:
#Adding review text column
from pyspark.sql.functions import col, when, lit

df_clean = df_clean.withColumn(
    "review_text",
    when((col("tip_amount") > 5) & (col("fare_amount") < 50),
         lit("The ride was excellent, smooth and the driver was very polite"))
    .when((col("tip_amount") == 0) & (col("fare_amount") > 50),
          lit("The ride was too expensive and the experience was disappointing"))
    .when(col("trip_distance") < 2,
          lit("The trip was short and completed quickly"))
    .otherwise(
        lit("The ride was average and reached the destination on time")
    )
)


In [10]:
#adding rating column
df_clean = df_clean.withColumn(
    "rating",
    when(col("tip_amount") > 5, 5)
    .when((col("tip_amount") > 0) & (col("tip_amount") <= 5), 4)
    .when(col("tip_amount") == 0, 2)
    .otherwise(3)
)


In [11]:
#Adding sentiment label
df_clean = df_clean.withColumn(
    "sentiment_label",
    when(col("rating") >= 4, "Positive")
    .when(col("rating") == 3, "Neutral")
    .otherwise("Negative")
)


In [12]:
df_clean.select(
    "fare_amount",
    "tip_amount",
    "trip_distance",
    "review_text",
    "rating",
    "sentiment_label"
).show(10, truncate=False)


+-----------+----------+-------------+---------------------------------------------------------------+------+---------------+
|fare_amount|tip_amount|trip_distance|review_text                                                    |rating|sentiment_label|
+-----------+----------+-------------+---------------------------------------------------------------+------+---------------+
|12.0       |3.25      |1.59         |The trip was short and completed quickly                       |4     |Positive       |
|14.5       |2.0       |3.3          |The ride was average and reached the destination on time       |4     |Positive       |
|9.5        |0.0       |1.8          |The trip was short and completed quickly                       |2     |Negative       |
|3.5        |0.0       |0.5          |The trip was short and completed quickly                       |2     |Negative       |
|15.0       |0.0       |3.0          |The ride was average and reached the destination on time       |2     |Negative 

In [13]:
                            ##INSIGHTS##
#Peak Pickup Hours
from pyspark.sql.functions import hour

df_clean.withColumn(
    "pickup_hour", hour("tpep_pickup_datetime")
).groupBy("pickup_hour") \
 .count() \
 .orderBy("count", ascending=False) \
 .show()


+-----------+-----+
|pickup_hour|count|
+-----------+-----+
|         19| 9983|
|         18| 9220|
|         14| 8269|
|         21| 7959|
|         20| 7661|
|         17| 7471|
|         16| 7404|
|         15| 7360|
|         22| 7174|
|         12| 7032|
|         23| 6834|
|          9| 6692|
|         13| 6684|
|          8| 6138|
|          7| 5950|
|         10| 5865|
|         11| 5697|
|          0| 5098|
|          1| 3381|
|          2| 2545|
+-----------+-----+
only showing top 20 rows


In [14]:
#Passenger Count
df_clean.groupBy("passenger_count") \
    .count() \
    .orderBy("passenger_count") \
    .show()

+---------------+-----+
|passenger_count|count|
+---------------+-----+
|              0|   90|
|              1|99907|
|              2|19989|
|              3| 5806|
|              4| 2782|
|              5| 7632|
|              6| 5002|
|              9|    1|
+---------------+-----+



In [15]:
#Average Trip Distance per Hour
df_clean.withColumn(
    "pickup_hour", hour("tpep_pickup_datetime")
).groupBy("pickup_hour") \
 .avg("trip_distance") \
 .orderBy("pickup_hour") \
 .show()

+-----------+------------------+
|pickup_hour|avg(trip_distance)|
+-----------+------------------+
|          0| 3.263226755590426|
|          1|3.1609553386572022|
|          2|3.2678978388998012|
|          3|3.3719797468354478|
|          4| 4.158131776480403|
|          5| 4.500088709677422|
|          6|3.4000672834314547|
|          7|2.8659563025210106|
|          8|2.4510182469859867|
|          9|2.4255947399880453|
|         10|2.4069889173060544|
|         11|2.4674741091802703|
|         12|2.4381086461888497|
|         13| 2.559625972471579|
|         14|2.7721102914499935|
|         15| 2.804226902173911|
|         16|2.7549473257698556|
|         17|2.6263057154330065|
|         18| 2.570998915401303|
|         19| 2.542279875788838|
+-----------+------------------+
only showing top 20 rows


In [16]:
#Revenue by Payment Type
df_clean.groupBy("payment_type") \
    .sum("total_amount") \
    .show()


+------------+------------------+
|payment_type| sum(total_amount)|
+------------+------------------+
|           1|1431109.0700001924|
|           3| 4666.050000000002|
|           4|1349.7699999999995|
|           2| 651248.3699996863|
+------------+------------------+



In [17]:
#Avg Fare vs Distance
df_clean.selectExpr(
    "avg(trip_distance) as avg_distance",
    "avg(total_amount) as avg_fare"
).show()

+------------------+------------------+
|      avg_distance|          avg_fare|
+------------------+------------------+
|2.7749193748273817|14.789236238494434|
+------------------+------------------+



In [18]:
df_clean.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/content/task1_big_data_analysis_using_pyspark")
