In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('test') \
    .getOrCreate()

24/03/04 23:12:12 WARN Utils: Your hostname, EG.local resolves to a loopback address: 127.0.0.1; using 192.168.100.10 instead (on interface en0)
24/03/04 23:12:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/04 23:12:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019-10.csv')

In [4]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [None]:
!head -n 11 fhv_tripdata_2019-10.csv > head.csv

In [5]:
import pandas as pd

### Method 1

In [None]:
from pyspark.sql import types

In [None]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [None]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019-10.csv')

### Method 2

In [4]:
df_head = spark.read.csv('head.csv', header=True, inferSchema=True)

In [11]:
df_head.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [5]:
df = spark.read \
    .option("header", "true") \
    .schema(df_head.schema) \
    .csv('fhv_tripdata_2019-10.csv')

In [6]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', TimestampType(), True), StructField('dropOff_datetime', TimestampType(), True), StructField('PUlocationID', IntegerType(), True), StructField('DOlocationID', IntegerType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [14]:
df = df.repartition(6)

In [15]:
df.write.parquet('fhv/2019/10/')

                                                                                

In [7]:
df = spark.read.parquet('fhv/2019/10/')

In [8]:
df.count()

1897493

In [52]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [65]:
from pyspark.sql.functions import to_date, round

In [50]:
df.select('*') \
    .filter(to_date(df.pickup_datetime) == '2019-10-15') \
    .count()

62610

In [96]:
df.select(((df.dropOff_datetime - df.pickup_datetime) / 3600).cast('decimal(18, 2)').alias('trip_length')) \
    .sort('trip_length', ascending=False) \
    .show()

+-----------+
|trip_length|
+-----------+
|  631152.50|
|  631152.50|
|   87672.44|
|   70128.03|
|    8794.00|
|    8784.17|
|    1464.53|
|    1056.83|
|    1056.27|
|     793.55|
|     793.39|
|     793.30|
|     793.00|
|     792.99|
|     792.86|
|     792.81|
|     792.79|
|     792.77|
|     792.75|
|     792.75|
+-----------+
only showing top 20 rows



In [9]:
df_lookup = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [15]:
df_lookup.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [13]:
df_lookup.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [10]:
df_lookup.registerTempTable('lookup')



In [11]:
df.registerTempTable('fhv_trip_data')



In [16]:
spark.sql('''
select
    l.Zone,
    count(*) as trip_count
from fhv_trip_data t
inner join lookup l
on t.PUlocationID = l.LocationID
group by l.Zone
order by trip_count
''').show()

[Stage 11:>                                                         (0 + 6) / 6]

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|         Jamaica Bay|         1|
|Governor's Island...|         2|
| Green-Wood Cemetery|         5|
|       Broad Channel|         8|
|     Highbridge Park|        14|
|        Battery Park|        15|
|Saint Michaels Ce...|        23|
|Breezy Point/Fort...|        25|
|Marine Park/Floyd...|        26|
|        Astoria Park|        29|
|    Inwood Hill Park|        39|
|       Willets Point|        47|
|Forest Park/Highl...|        53|
|  Brooklyn Navy Yard|        57|
|        Crotona Park|        62|
|        Country Club|        77|
|     Freshkills Park|        89|
|       Prospect Park|        98|
|     Columbia Street|       105|
|  South Williamsburg|       110|
+--------------------+----------+
only showing top 20 rows



                                                                                