### PySpark Module 5 Assignment

```
Dataset Description:
* dispatching_base_number: The base station ID
* date: Date
* active_vehicles: The number of active vehicles
* trips: Trips
```


#### Tasks to be Done


In [1]:
#configuring spark
import findspark
findspark.find()
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('UberTripsDataAnalysis').master('local[2]').getOrCreate()
spark

In [3]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,DateType
schema = StructType([
    StructField("dispatching_base_number", StringType(), False),
    StructField("date", DateType(), True),
    StructField("active_vehicles", IntegerType(), True),
    StructField("trips", IntegerType(), True),
])

In [4]:
# 1. Load the dataset
df=spark\
    .read\
    .format('csv')\
    .option('header',True)\
    .option('dateFormat','M/d/y')\
    .option('mode','dropmalformed')\
    .option('badRecordsPath','./badRecords')\
    .schema(schema)\
    .load('../data/Mod5_uber_data.csv')

df.show(10)

+-----------------------+----------+---------------+-----+
|dispatching_base_number|      date|active_vehicles|trips|
+-----------------------+----------+---------------+-----+
|                 B02512|2015-01-01|            190| 1132|
|                 B02765|2015-01-01|            225| 1765|
|                 B02764|2015-01-01|           3427|29421|
|                 B02682|2015-01-01|            945| 7679|
|                 B02617|2015-01-01|           1228| 9537|
|                 B02598|2015-01-01|            870| 6903|
|                 B02598|2015-01-02|            785| 4768|
|                 B02617|2015-01-02|           1137| 7065|
|                 B02512|2015-01-02|            175|  875|
|                 B02682|2015-01-02|            890| 5506|
+-----------------------+----------+---------------+-----+
only showing top 10 rows



In [5]:
# 2. Create a temporary SQL table of the dataset
df.createOrReplaceTempView("UberTrip")
spark.sql("CREATE TEMPORARY VIEW UberTripTemp AS SELECT * FROM UberTrip")
rdd = spark.sql("SELECT * FROM UberTripTemp")
rdd.show(10)

+-----------------------+----------+---------------+-----+
|dispatching_base_number|      date|active_vehicles|trips|
+-----------------------+----------+---------------+-----+
|                 B02512|2015-01-01|            190| 1132|
|                 B02765|2015-01-01|            225| 1765|
|                 B02764|2015-01-01|           3427|29421|
|                 B02682|2015-01-01|            945| 7679|
|                 B02617|2015-01-01|           1228| 9537|
|                 B02598|2015-01-01|            870| 6903|
|                 B02598|2015-01-02|            785| 4768|
|                 B02617|2015-01-02|           1137| 7065|
|                 B02512|2015-01-02|            175|  875|
|                 B02682|2015-01-02|            890| 5506|
+-----------------------+----------+---------------+-----+
only showing top 10 rows



In [6]:
# 3. Print the schema of the table
rdd.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- date: date (nullable = true)
 |-- active_vehicles: integer (nullable = true)
 |-- trips: integer (nullable = true)



In [7]:
# 4. Print all the distinct ‘dispatching_base_number’
distinct_dispatching_base_number=spark.sql('SELECT DISTINCT dispatching_base_number as unique_dispatch_number FROM UberTripTemp')
distinct_dispatching_base_number.show()

+----------------------+
|unique_dispatch_number|
+----------------------+
|                B02512|
|                B02598|
|                B02682|
|                B02765|
|                B02617|
|                B02764|
+----------------------+



In [27]:
# 5. Determine which dispatching base is the busiest based on the number of trips
busiest_base=spark.sql('select dispatching_base_number \
                            from UberTripTemp\
                            group by dispatching_base_number\
                            order by sum(trips) desc\
                            limit 1').collect()[0][0]
f'Busiest base by trips: {busiest_base}'

'Busiest base by trips: B02764'

In [33]:
#Alternate method - Column Based Expression
import pyspark.sql.functions as f
rdd\
    .groupBy('dispatching_base_number')\
    .agg(f.sum('trips')\
    .alias('TotalTrips'))\
    .orderBy(f.desc('TotalTrips'))\
    .limit(1)\
    .show()


+-----------------------+----------+
|dispatching_base_number|TotalTrips|
+-----------------------+----------+
|                 B02764|   1914449|
+-----------------------+----------+



In [34]:
# 6. Determine the five busiest days based on the number of trips in the time range of the data
spark.sql('select date, sum(trips) as TotalTrips\
            from UberTripTemp\
            group by date\
            order by sum(trips) desc\
            limit 5')\
            .show()

+----------+----------+
|      date|TotalTrips|
+----------+----------+
|2015-02-20|    100915|
|2015-02-14|    100345|
|2015-02-21|     98380|
|2015-02-13|     98024|
|2015-01-31|     92257|
+----------+----------+



In [46]:
# Alternate method - Window-Rank
from pyspark.sql.window import Window

window=Window.orderBy(f.desc('TotalTrips'))
rdd\
    .groupBy('date')\
    .agg(f.sum('trips').alias('TotalTrips'))\
    .withColumn('drank',f.dense_rank().over(window))\
    .where(f.col('drank')<=5)\
    .show()

+----------+----------+-----+
|      date|TotalTrips|drank|
+----------+----------+-----+
|2015-02-20|    100915|    1|
|2015-02-14|    100345|    2|
|2015-02-21|     98380|    3|
|2015-02-13|     98024|    4|
|2015-01-31|     92257|    5|
+----------+----------+-----+



In [16]:
# 7. Calculate the average number of active vehicles on the base station ‘B02512’
average_no_of_active_vehicles=spark.sql('SELECT CEIL(AVG(active_vehicles)) as average_active_vehicles_B02512\
                                            FROM UberTripTemp\
                                            WHERE dispatching_base_number="B02512"')
average_no_of_active_vehicles.show()

+------------------------------+
|average_active_vehicles_B02512|
+------------------------------+
|                           223|
+------------------------------+



In [None]:
# spark.stop()