In [1]:
!mkdir taxidata

In [2]:
!cd taxidata

In [3]:
!curl -O https://storage.googleapis.com/aas-data-sets/trip_data_1.csv.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  899M  100  899M    0     0  93.1M      0  0:00:09  0:00:09 --:--:-- 88.4M


In [4]:
!unzip trip_data_1.csv.zip

Archive:  trip_data_1.csv.zip
  inflating: trip_data_1.csv         


In [5]:
!head -n 5 trip_data_1.csv

medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.00,-73.978165,40.757977,-73.989838,40.751171
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.50,-74.006683,40.731781,-73.994499,40.75066
0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.10,-74.004707,40.73777,-74.009834,40.726002
DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,.70,-73.974602,40.759945,-73.984734,40.759388


In [6]:
!mv trip_data_1.csv taxidata/

In [9]:
!pip install pyspark
import pyspark

from pyspark.sql import SparkSession

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=0e430f9d1515795a0ffb3e444773c0a669137de6ecfb1bcf4775d54fe1696b58
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [10]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").appName('chapter_7').getOrCreate()

# Preparing the Data

In [11]:
taxi_raw = spark.read.option("header", True).csv("taxidata/trip_data_1.csv")
taxi_raw.show(1, vertical=True)

-RECORD 0----------------------------------
 medallion          | 89D227B655E5C82AE... 
 hack_license       | BA96DE419E711691B... 
 vendor_id          | CMT                  
 rate_code          | 1                    
 store_and_fwd_flag | N                    
 pickup_datetime    | 2013-01-01 15:11:48  
 dropoff_datetime   | 2013-01-01 15:18:10  
 passenger_count    | 4                    
 trip_time_in_secs  | 382                  
 trip_distance      | 1.00                 
 pickup_longitude   | -73.978165           
 pickup_latitude    | 40.757977            
 dropoff_longitude  | -73.989838           
 dropoff_latitude   | 40.751171            
only showing top 1 row



In [12]:
taxi_raw.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [13]:
from pyspark.sql import functions as fun

taxi_raw = taxi_raw.withColumn('pickup_datetime',
                                fun.to_timestamp(fun.col('pickup_datetime'),
                                                "yyyy-MM-dd HH:mm:ss"))
taxi_raw = taxi_raw.withColumn('dropoff_datetime',
                                fun.to_timestamp(fun.col('dropoff_datetime'),
                                                "yyyy-MM-dd HH:mm:ss"))

In [14]:
taxi_raw.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- vendor_id: string (nullable = true)
 |-- rate_code: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)



In [15]:
taxi_raw.sort(fun.col('pickup_datetime').desc()).show(3, vertical=True)

-RECORD 0----------------------------------
 medallion          | EA00A64CBDB68C77D... 
 hack_license       | 2045C77002FA0F2E0... 
 vendor_id          | CMT                  
 rate_code          | 1                    
 store_and_fwd_flag | N                    
 pickup_datetime    | 2013-01-31 23:59:59  
 dropoff_datetime   | 2013-02-01 00:08:39  
 passenger_count    | 1                    
 trip_time_in_secs  | 520                  
 trip_distance      | 1.50                 
 pickup_longitude   | -73.970528           
 pickup_latitude    | 40.75502             
 dropoff_longitude  | -73.981201           
 dropoff_latitude   | 40.769104            
-RECORD 1----------------------------------
 medallion          | E3F00BB3F4E710383... 
 hack_license       | 10A2B96DE39865918... 
 vendor_id          | CMT                  
 rate_code          | 1                    
 store_and_fwd_flag | N                    
 pickup_datetime    | 2013-01-31 23:59:59  
 dropoff_datetime   | 2013-02-01

In [16]:
geospatial_temporal_colnames = ["pickup_longitude", "pickup_latitude", \
                                "dropoff_longitude", "dropoff_latitude", \
                                "pickup_datetime", "dropoff_datetime"]
taxi_raw.select([fun.count(fun.when(fun.isnull(c), c)).\
                            alias(c) for c in geospatial_temporal_colnames]).\
                show()

+----------------+---------------+-----------------+----------------+---------------+----------------+
|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_datetime|dropoff_datetime|
+----------------+---------------+-----------------+----------------+---------------+----------------+
|               0|              0|               86|              86|              0|               0|
+----------------+---------------+-----------------+----------------+---------------+----------------+



In [17]:
taxi_raw = taxi_raw.na.drop(subset=geospatial_temporal_colnames)

In [18]:
print("Count of zero dropoff, pickup latitude and longitude records")
taxi_raw.groupBy((fun.col("dropoff_longitude") == 0) |
  (fun.col("dropoff_latitude") == 0) |
  (fun.col("pickup_longitude") == 0) |
  (fun.col("pickup_latitude") == 0)).\
    count().show()

Count of zero dropoff, pickup latitude and longitude records
+----------------------------------------------------------------------------------------------------------+--------+
|((((dropoff_longitude = 0) OR (dropoff_latitude = 0)) OR (pickup_longitude = 0)) OR (pickup_latitude = 0))|   count|
+----------------------------------------------------------------------------------------------------------+--------+
|                                                                                                      NULL|     176|
|                                                                                                      true|  286433|
|                                                                                                     false|14489920|
+----------------------------------------------------------------------------------------------------------+--------+



# Geospatial Analysis

In [19]:
url="https://nycdatastables.s3.amazonaws.com/2013-08-19T18:15:35.172Z/nyc-borough-boundaries-polygon.geojson"
!curl -O $url

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   304    0   304    0     0   1094      0 --:--:-- --:--:-- --:--:--  1097


In [20]:
!mv nyc-borough-boundaries-polygon.geojson nyc-boroughs.geojson

In [25]:
!head -n 7 nyc-boroughs.geojson

<?xml version="1.0" encoding="UTF-8"?>
<Error><Code>NoSuchBucket</Code><Message>The specified bucket does not exist</Message><BucketName>nycdatastables</BucketName><RequestId>XY0W84DKGF5YWG82</RequestId><HostId>w7CMMStIqqsg6cq4v4W8c2GORvOHgwUoqWNFhxcc/KBKJzxBkRc8hJdu8NK1bs69k/vb0gR423c=</HostId></Error>

In [26]:
!pip install geopandas



In [None]:
import geopandas as gdp

gdf = gdp.read_file("nyc-boroughs.geojson")

In [None]:
gdf = gdf.to_crs(3857)

gdf['area'] = gdf.apply(lambda x: x['geometry'].area, axis=1)
gdf.head(5)

In [None]:
gdf = gdf.sort_values(by=['boroughCode', 'area'], ascending=[True, False])
gdf.head(5)

In [None]:
from pyspark.sql.types import StringType

b_gdf = spark.sparkContext.broadcast(gdf)


def find_borough(latitude,longitude):
    mgdf = b_gdf.value.apply(lambda x: x['borough'] if \
                              x['geometry'].\
                              intersects(gdp.\
                                        points_from_xy(
                                            [longitude], \
                                            [latitude])[0]) \
                              else None, axis=1)
    idx = mgdf.first_valid_index()
    return mgdf.loc[idx] if idx is not None else None


find_borough_udf = fun.udf(find_borough, StringType())

In [None]:
df_with_boroughs = taxi_raw.\
                    withColumn("dropoff_borough", \
                              find_borough_udf(
                                fun.col("dropoff_latitude"),\
                                fun.col('dropoff_longitude')))

df_with_boroughs.groupBy(fun.col("dropoff_borough")).count().show()

# Sessionization in PySpark

In [None]:
from pyspark.sql import Window

window_spec = Window.partitionBy("hack_license").\
                      orderBy(fun.col("hack_license"),
                              fun.col("pickup_datetime"))

In [None]:
window_spec.cache()

In [None]:
df_with_borough_durations = df_with_boroughs.\
            withColumn("trip_time_difference", \
            fun.col("pickup_datetime") - fun.lag(fun.col("pickup_datetime"),
                                          1). \
            over(window_spec)).show(50, vertical=True)

In [None]:
df_with_borough_durations.\
  selectExpr("floor(seconds / 3600) as hours").\
    groupBy("hours").\
    count().\
    sort("hours").\
    show()

In [None]:
from pyspark.sql.functions import avg, stddev

df_with_borough_durations.\
    where("seconds > 0 AND seconds < 60*60*4").\
    groupBy("borough").\
    agg(avg("seconds"), stddev("seconds")).\
    show()