In [None]:
import pyspark

from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.config("spark.driver.memory", "8g").appName('chapter_7').getOrCreate()

### Preparing the Data

In [None]:
taxi_raw = spark.read.option("header", "true").csv("taxidata")
taxi_raw.show(1, vertical=True)

In [None]:
taxi_raw.printSchema()

In [None]:
from pyspark.sql import functions as fun

taxi_raw = taxi_raw.withColumn('pickup_datetime',
                                fun.to_timestamp(fun.col('pickup_datetime'),
                                                "yyyy-MM-dd HH:mm:ss"))
taxi_raw = taxi_raw.withColumn('dropoff_datetime',
                                fun.to_timestamp(fun.col('dropoff_datetime'),
                                                "yyyy-MM-dd HH:mm:ss"))

In [None]:
taxi_raw.printSchema()

In [None]:
taxi_raw.sort(fun.col("pickup_datetime").desc()).show(3, vertical=True)

In [None]:
geospatial_temporal_colnames = ["pickup_longitude", "pickup_latitude", \
                                "dropoff_longitude", "dropoff_latitude", \
                                "pickup_datetime", "dropoff_datetime"]
taxi_raw.select([fun.count(fun.when(fun.isnull(c), c)).\
                            alias(c) for c in geospatial_temporal_colnames]).\
                show()


In [None]:
taxi_raw = taxi_raw.na.drop(subset=geospatial_temporal_colnames)

In [None]:
print("Count of zero dropoff, pickup latitude and longitude records")
taxi_raw.groupBy((fun.col("dropoff_longitude") == 0) |
  (fun.col("dropoff_latitude") == 0) |
  (fun.col("pickup_longitude") == 0) |
  (fun.col("pickup_latitude") == 0)).\
    count().show()

### Geospatial Analysis

In [None]:
! head -n 7 nyc-boroughs.geojson

In [None]:
import geopandas as gdp

gdf = gdp.read_file("nyc-boroughs.geojson")

In [None]:
gdf = gdf.to_crs(3857)

gdf['area'] = gdf.apply(lambda x: x['geometry'].area, axis=1)
gdf.head(5)

In [None]:
gdf = gdf.sort_values(by=['boroughCode', 'area'], ascending=[True, False])
gdf.head(5)

In [None]:
from pyspark.sql.types import StringType

b_gdf = spark.sparkContext.broadcast(gdf)

def find_borough(latitude,longitude):
    mgdf = b_gdf.value.apply(lambda x: x['borough'] if \
                              x['geometry'].\
                              intersects(gdp.\
                                        points_from_xy(
                                            [longitude], \
                                            [latitude])[0]) \
                              else None, axis=1)
    idx = mgdf.first_valid_index()
    return mgdf.loc[idx] if idx is not None else None

find_borough_udf = fun.udf(find_borough, StringType())

In [None]:
df_with_boroughs = taxi_raw.\
                    withColumn("dropoff_borough", \
                              find_borough_udf(
                                fun.col("dropoff_latitude"),\
                                fun.col('dropoff_longitude')))

df_with_boroughs.groupBy(fun.col("dropoff_borough")).count().show()

### Sessionization in PySpark

In [None]:
from pyspark.sql import Window

window_spec = Window.partitionBy("hack_license").\
                      orderBy(fun.col("hack_license"),
                              fun.col("pickup_datetime"))

In [None]:
window_spec.cache()

In [None]:
df_with_borough_durations = df_with_boroughs.\
            withColumn("trip_time_difference", \
            fun.col("pickup_datetime") - fun.lag(fun.col("pickup_datetime"),
                                          1). \
            over(window_spec)).show(50, vertical=True)

In [None]:
df_with_borough_durations.\
  selectExpr("floor(seconds / 3600) as hours").\
    groupBy("hours").\
    count().\
    sort("hours").\
    show()

In [None]:
from pyspark.sql.functions import avg, stddev

df_with_borough_durations.\
    where("seconds > 0 AND seconds < 60*60*4").\
    groupBy("borough").\
    agg(avg("seconds"), stddev("seconds")).\
    show()