# Enabling  spark Session

In [1]:
import pyspark 
import pandas as pd
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [2]:
spark = SparkSession.builder.appName("Uber_data").master("local").getOrCreate()

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# Reading the csv file

In [4]:
df=spark.read.format('csv').options(header=True).load('uber_data.csv')

In [5]:
#df=spark.read.option('header','true').csv('uber_data.csv')

In [6]:
df.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8
2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.97174072265625,40.79218292236328,3,N,-74.17716979980467,40.69505310058594,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8


In [7]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



# Converting date time in their respective type

In [8]:
df = df.withColumn("trip_id", monotonically_increasing_id())

In [9]:
df.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_id
1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,0
1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35,1
2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8,2
2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62,3
2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.97174072265625,40.79218292236328,3,N,-74.17716979980467,40.69505310058594,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8,4


In [10]:
df=df.withColumn('tpep_pickup_datetime',to_timestamp(df.tpep_pickup_datetime))
df=df.withColumn('tpep_dropoff_datetime',to_timestamp(df.tpep_dropoff_datetime))

In [11]:
df.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- trip_id: long (nullable = false)



In [12]:
df.limit(4)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_id
1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,0
1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35,1
2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8,2
2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62,3


# Deleting the duplicates

In [13]:
df.count()

100000

In [14]:
len(df.columns)

20

In [15]:
df=df.dropDuplicates()

In [16]:
#df = df.withColumn("trip_id", monotonically_increasing_id())

In [17]:
df.count()

100000

In [18]:
df.limit(6)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_id
1,2016-03-01 00:00:14,2016-03-01 00:21:58,1,6.9,-73.95418548583984,40.76414108276367,1,N,-73.90243530273438,40.82696914672852,2,22.5,0.5,0.5,0.0,0.0,0.3,23.8,41
2,2016-03-01 00:00:40,2016-03-01 00:08:47,1,1.43,-73.99883270263672,40.72496795654297,1,N,-74.00899505615233,40.73814392089844,1,7.5,0.5,0.5,2.64,0.0,0.3,11.44,113
1,2016-03-01 00:01:06,2016-03-01 00:14:41,1,3.2,-73.98702239990233,40.75537490844727,1,N,-73.9505844116211,40.77481842041016,1,12.5,0.5,0.5,3.45,0.0,0.3,17.25,172
2,2016-03-10 07:07:48,2016-03-10 07:17:46,1,1.28,-73.98023986816406,40.73904037475586,1,N,-73.99152374267578,40.748779296875,1,8.5,0.0,0.5,1.0,0.0,0.3,10.3,251
2,2016-03-10 07:08:33,2016-03-10 07:15:24,5,1.92,-73.98770141601561,40.77016067504882,1,N,-73.96994018554686,40.794578552246094,1,8.0,0.0,0.5,1.0,0.0,0.3,9.8,376
2,2016-03-10 07:09:10,2016-03-10 07:19:57,1,1.81,-73.94940948486328,40.773529052734375,1,N,-73.96933746337889,40.76039123535156,1,9.5,0.0,0.5,2.06,0.0,0.3,12.36,467


In [19]:
df=df.orderBy('trip_id')

In [20]:
df.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,trip_id
1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,0
1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35,1
2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8,2
2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62,3
2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.97174072265625,40.79218292236328,3,N,-74.17716979980467,40.69505310058594,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8,4


# Rearranging df

In [21]:
df.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'RatecodeID',
 'store_and_fwd_flag',
 'dropoff_longitude',
 'dropoff_latitude',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'trip_id']

In [22]:
df=df[['trip_id','VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'RatecodeID',
 'store_and_fwd_flag',
 'dropoff_longitude',
 'dropoff_latitude',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount']]

In [23]:
df.limit(5)

trip_id,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,2016-03-01 00:00:00,2016-03-01 00:07:55,1,2.5,-73.97674560546875,40.765151977539055,1,N,-74.00426483154298,40.74612808227539,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,2016-03-01 00:00:00,2016-03-01 00:11:06,1,2.9,-73.98348236083984,40.767925262451165,1,N,-74.00594329833984,40.7331657409668,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2016-03-01 00:00:00,2016-03-01 00:31:06,2,19.98,-73.78202056884764,40.64480972290039,1,N,-73.97454071044923,40.6757698059082,1,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,2,2016-03-01 00:00:00,2016-03-01 00:00:00,3,10.78,-73.86341857910156,40.769813537597656,1,N,-73.96965026855469,40.757766723632805,1,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,2016-03-01 00:00:00,2016-03-01 00:00:00,5,30.43,-73.97174072265625,40.79218292236328,3,N,-74.17716979980467,40.69505310058594,1,98.0,0.0,0.0,0.0,15.5,0.3,113.8


# Creating "datetime_dim" dimension table

In [24]:
datetime_dim = df.select('tpep_pickup_datetime', 'tpep_dropoff_datetime')\
                 .withColumn('pick_hour', hour('tpep_pickup_datetime'))\
                 .withColumn('pick_day', dayofmonth('tpep_pickup_datetime'))\
                 .withColumn('pick_month', month('tpep_pickup_datetime'))\
                 .withColumn('pick_year', year('tpep_pickup_datetime'))\
                 .withColumn('pick_weekday', date_format('tpep_pickup_datetime', 'u').cast('integer'))\
                 .withColumn('drop_hour', hour('tpep_dropoff_datetime'))\
                 .withColumn('drop_day', dayofmonth('tpep_dropoff_datetime'))\
                 .withColumn('drop_month', month('tpep_dropoff_datetime'))\
                 .withColumn('drop_year', year('tpep_dropoff_datetime'))\
                 .withColumn('drop_weekday', date_format('tpep_dropoff_datetime', 'u').cast('integer'))\
                 .withColumn('datetime_id', monotonically_increasing_id())

In [25]:
datetime_dim.limit(5)

tpep_pickup_datetime,tpep_dropoff_datetime,pick_hour,pick_day,pick_month,pick_year,pick_weekday,drop_hour,drop_day,drop_month,drop_year,drop_weekday,datetime_id
2016-03-01 00:00:00,2016-03-01 00:07:55,0,1,3,2016,2,0,1,3,2016,2,0
2016-03-01 00:00:00,2016-03-01 00:11:06,0,1,3,2016,2,0,1,3,2016,2,1
2016-03-01 00:00:00,2016-03-01 00:31:06,0,1,3,2016,2,0,1,3,2016,2,2
2016-03-01 00:00:00,2016-03-01 00:00:00,0,1,3,2016,2,0,1,3,2016,2,3
2016-03-01 00:00:00,2016-03-01 00:00:00,0,1,3,2016,2,0,1,3,2016,2,4


In [26]:
datetime_dim.columns

['tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'pick_hour',
 'pick_day',
 'pick_month',
 'pick_year',
 'pick_weekday',
 'drop_hour',
 'drop_day',
 'drop_month',
 'drop_year',
 'drop_weekday',
 'datetime_id']

In [27]:
datetime_dim=datetime_dim[[
 'datetime_id',
 'tpep_pickup_datetime',
 'pick_hour',
 'pick_day',
 'pick_month',
 'pick_year',
 'pick_weekday',
 'tpep_dropoff_datetime',
 'drop_hour',
 'drop_day',
 'drop_month',
 'drop_year',
 'drop_weekday'
  ]]

In [28]:
datetime_dim.limit(5)

datetime_id,tpep_pickup_datetime,pick_hour,pick_day,pick_month,pick_year,pick_weekday,tpep_dropoff_datetime,drop_hour,drop_day,drop_month,drop_year,drop_weekday
0,2016-03-01 00:00:00,0,1,3,2016,2,2016-03-01 00:07:55,0,1,3,2016,2
1,2016-03-01 00:00:00,0,1,3,2016,2,2016-03-01 00:11:06,0,1,3,2016,2
2,2016-03-01 00:00:00,0,1,3,2016,2,2016-03-01 00:31:06,0,1,3,2016,2
3,2016-03-01 00:00:00,0,1,3,2016,2,2016-03-01 00:00:00,0,1,3,2016,2
4,2016-03-01 00:00:00,0,1,3,2016,2,2016-03-01 00:00:00,0,1,3,2016,2


# Creating "passenger_count_dim" dimension table

In [29]:
passenger_count_dim=df.select('passenger_count').withColumn('passenger_count_id',monotonically_increasing_id())
passenger_count_dim=passenger_count_dim['passenger_count_id','passenger_count']

In [30]:
passenger_count_dim.limit(5)

passenger_count_id,passenger_count
0,1
1,1
2,2
3,3
4,5


# Creating "trip_distance_dim" dimension table

In [31]:
trip_distance_dim=df.select('trip_distance').withColumn('trip_distance_id',monotonically_increasing_id())
trip_distance_dim=trip_distance_dim[['trip_distance_id','trip_distance']]

In [32]:
trip_distance_dim.limit(5)

trip_distance_id,trip_distance
0,2.5
1,2.9
2,19.98
3,10.78
4,30.43


# Creating "rate_code_dim" dimenson table

In [33]:
rate_code_type = {
    "1":"Standard rate",
    "2":"JFK",
    "3":"Newark",
    "4":"Nassau or Westchester",
    "5":"Negotiated fare",
    "6":"Group ride"
}

rate_code_dim=df.select("RatecodeID")
rate_code_dim=rate_code_dim.withColumn("rate_code_name",df['RatecodeID'])
rate_code_dim=rate_code_dim.replace(to_replace=rate_code_type,subset=['rate_code_name'])
rate_code_dim=rate_code_dim.withColumn("rate_code_id",monotonically_increasing_id())
rate_code_dim=rate_code_dim.select("rate_code_id","RatecodeID","rate_code_name")

In [34]:
rate_code_dim.limit(5)

rate_code_id,RatecodeID,rate_code_name
0,1,Standard rate
1,1,Standard rate
2,1,Standard rate
3,1,Standard rate
4,3,Newark


# Creating "pickup_location_dim" dimenson table

In [35]:
pickup_location_dim=df.select('pickup_latitude','pickup_longitude').withColumn("pickup_location_id",monotonically_increasing_id())
pickup_location_dim=pickup_location_dim.select("pickup_location_id","pickup_latitude","pickup_longitude")

In [36]:
pickup_location_dim.limit(5)

pickup_location_id,pickup_latitude,pickup_longitude
0,40.765151977539055,-73.97674560546875
1,40.767925262451165,-73.98348236083984
2,40.64480972290039,-73.78202056884764
3,40.769813537597656,-73.86341857910156
4,40.79218292236328,-73.97174072265625


# Creating "drop_location_dim" dimenson table

In [37]:
dropoff_location_dim=df.select('dropoff_latitude','dropoff_longitude').withColumn("dropoff_location_id",monotonically_increasing_id())
dropoff_location_dim=dropoff_location_dim.select("dropoff_location_id","dropoff_latitude","dropoff_longitude")

In [38]:
dropoff_location_dim.limit(5)

dropoff_location_id,dropoff_latitude,dropoff_longitude
0,40.74612808227539,-74.00426483154298
1,40.7331657409668,-74.00594329833984
2,40.6757698059082,-73.97454071044923
3,40.757766723632805,-73.96965026855469
4,40.69505310058594,-74.17716979980467


# Creating "payment_type_dim" dimension table

In [39]:
payment_type_name = {
    "1":"Credit card",
    "2":"Cash",
    "3":"No charge",
    "4":"Dispute",
    "5":"Unknown",
    "6":"Voided trip"
}

payment_type_dim=df.select("payment_type")
payment_type_dim=payment_type_dim.withColumn("payment_type_name",col('payment_type'))
payment_type_dim=payment_type_dim.replace(to_replace=payment_type_name,subset=['payment_type_name'])
payment_type_dim=payment_type_dim.withColumn("payment_type_id",monotonically_increasing_id())
payment_type_dim=payment_type_dim.select("payment_type_id","payment_type","payment_type_name")

In [40]:
payment_type_dim.limit(5)

payment_type_id,payment_type,payment_type_name
0,1,Credit card
1,1,Credit card
2,1,Credit card
3,1,Credit card
4,1,Credit card


# Creating 'fact_tabel' table

In [41]:
fact_table = df.join(passenger_count_dim, df.trip_id == passenger_count_dim.passenger_count_id) \
    .join(trip_distance_dim, df.trip_id == trip_distance_dim.trip_distance_id) \
    .join(rate_code_dim, df.trip_id == rate_code_dim.rate_code_id) \
    .join(pickup_location_dim, df.trip_id == pickup_location_dim.pickup_location_id) \
    .join(dropoff_location_dim, df.trip_id == dropoff_location_dim.dropoff_location_id) \
    .join(datetime_dim, df.trip_id == datetime_dim.datetime_id) \
    .join(payment_type_dim, df.trip_id == payment_type_dim.payment_type_id) \
    .select('trip_id','VendorID', 'datetime_id', 'passenger_count_id',
               'trip_distance_id', 'rate_code_id', 'store_and_fwd_flag', 'pickup_location_id', 'dropoff_location_id',
               'payment_type_id', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
               'improvement_surcharge', 'total_amount')\
    .orderBy("trip_id")



In [42]:
fact_table.limit(5)

trip_id,VendorID,datetime_id,passenger_count_id,trip_distance_id,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type_id,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
0,1,0,0,0,0,N,0,0,0,9.0,0.5,0.5,2.05,0.0,0.3,12.35
1,1,1,1,1,1,N,1,1,1,11.0,0.5,0.5,3.05,0.0,0.3,15.35
2,2,2,2,2,2,N,2,2,2,54.5,0.5,0.5,8.0,0.0,0.3,63.8
3,2,3,3,3,3,N,3,3,3,31.5,0.0,0.5,3.78,5.54,0.3,41.62
4,2,4,4,4,4,N,4,4,4,98.0,0.0,0.0,0.0,15.5,0.3,113.8
