In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/04 20:44:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [12]:
spark.version

'3.3.2'

In [13]:
src_filename='fhv_tripdata_2019-10.csv.gz'

In [3]:
df = spark.read \
    .option("header", "true") \
    .csv(src_filename)

df.show()

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [4]:
import pandas as pd

df_pd = pd.read_csv(src_filename)
df_pd.head()

Unnamed: 0,dispatching_base_num,pickup_datetime,dropOff_datetime,PUlocationID,DOlocationID,SR_Flag,Affiliated_base_number
0,B00009,2019-10-01 00:23:00,2019-10-01 00:35:00,264.0,264.0,,B00009
1,B00013,2019-10-01 00:11:29,2019-10-01 00:13:22,264.0,264.0,,B00013
2,B00014,2019-10-01 00:11:43,2019-10-01 00:37:20,264.0,264.0,,B00014
3,B00014,2019-10-01 00:56:29,2019-10-01 00:57:47,264.0,264.0,,B00014
4,B00014,2019-10-01 00:23:09,2019-10-01 00:28:27,264.0,264.0,,B00014


In [8]:
from pyspark.sql import types

schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),    
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime',types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(src_filename)

In [11]:
df.show(10)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [14]:
df = df.repartition(6)

df.write.parquet('fhvhv/2019/10/', mode="overwrite")

                                                                                

In [15]:
df = spark.read.parquet('fhvhv/2019/10/')

In [17]:
df.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02784|2019-10-01 09:55:38|2019-10-01 10:05:43|          89|          85|   null|                  null|
|              B02429|2019-10-21 04:15:47|2019-10-21 04:36:04|         264|         264|   null|                B02429|
|              B01482|2019-10-19 12:00:00|2019-10-19 12:20:00|         264|         264|   null|                B01482|
|              B03015|2019-10-11 14:28:00|2019-10-11 14:32:44|         264|         216|   null|                B03015|
|              B01529|2019-10-21 18:00:26|2019-10-21 18:07:21|         264|          80|   null|                B01529|
+--------------------+------------------

In [18]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [19]:
from pyspark.sql import functions as F

In [23]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropOff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

1897493

In [26]:
df.filter(F.to_date(df.pickup_datetime) == '2019-10-15').count()

62610

In [57]:
from datetime import datetime

def dt_diff_in_hours(dt_start, dt_end):
    if (dt_start is None or dt_end is None):
        return 0
    return (dt_end - dt_start).total_seconds() / 3600


In [62]:
fmt = '%Y-%m-%d %H:%M:%S'
t1 = datetime.strptime('2019-10-01 10:05:43', fmt)
t2 = datetime.strptime('2019-10-02 10:05:43', fmt)
dt_diff_in_hours(t1, F.to_timestamp('2019-10-02 10:05:43'))

TypeError: 'Column' object is not callable

In [68]:
dt_diff_in_hours_udf=F.udf(dt_diff_in_hours,  returnType= types.DoubleType())

In [70]:
df \
    .withColumn('hours', dt_diff_in_hours_udf('pickup_datetime', 'dropOff_datetime')) \
    .select('pickup_datetime', 'dropOff_datetime', 'hours') \
    .show()

#this does not work


24/03/04 22:44:03 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 42)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 670, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 507, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 289, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/s

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 670, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
                                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 507, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 289, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/home/nicolai/spark/spark-3.3.2-bin-hadoop3/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: code() argument 13 must be str, not int


In [71]:
df.registerTempTable('trips_data')





In [82]:
spark.sql("""
SELECT (bigint(to_timestamp(dropOff_datetime)) - bigint(to_timestamp(pickup_datetime)))/3600 as travel_hours, 
    pickup_datetime, 
    dropOff_datetime
from 
    trips_data 
order by bigint(dropOff_datetime) - bigint(pickup_datetime) desc
limit 10;
""").show()

+------------------+-------------------+-------------------+
|      travel_hours|    pickup_datetime|   dropOff_datetime|
+------------------+-------------------+-------------------+
|          631152.5|2019-10-11 18:00:00|2091-10-11 18:30:00|
|          631152.5|2019-10-28 09:00:00|2091-10-28 09:30:00|
| 87672.44083333333|2019-10-31 23:46:33|2029-11-01 00:13:00|
| 70128.02805555555|2019-10-01 21:43:42|2027-10-01 21:45:23|
|            8794.0|2019-10-17 14:00:00|2020-10-18 00:00:00|
| 8784.166666666666|2019-10-26 21:26:00|2020-10-26 21:36:00|
|1464.5344444444445|2019-10-30 12:30:04|2019-12-30 13:02:08|
|1056.8266666666666|2019-10-25 07:04:57|2019-12-08 07:54:33|
|1056.2705555555556|2019-10-25 07:04:57|2019-12-08 07:21:11|
| 793.5530555555556|2019-10-01 13:47:17|2019-11-03 15:20:28|
+------------------+-------------------+-------------------+





In [84]:
spark.sql("""
SELECT PULocationID, COUNT(*)
from trips_data 
group by PULocationID
limit 10;
""").show()

+------------+--------+
|PULocationID|count(1)|
+------------+--------+
|         148|     684|
|         243|     732|
|          31|     264|
|         137|    2308|
|         251|    1317|
|          85|     475|
|          65|     925|
|          53|    1410|
|         255|     273|
|         133|     393|
|          78|    2909|
|         155|     644|
|         108|     697|
|         193|    2310|
|         211|     235|
|          34|      57|
|         126|    1059|
|         115|     632|
|         101|     719|
|          81|     786|
+------------+--------+
only showing top 20 rows



In [86]:
schema = types.StructType([
    types.StructField('locationid', types.IntegerType(), True),    
    types.StructField('borough', types.StringType(), True), 
    types.StructField('zone',types.StringType(), True), 
    types.StructField('service_zone',types.StringType(), True)
])

df_zones = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('taxi_zone_lookup.csv')

df_zones.show()

+----------+-------------+--------------------+------------+
|locationid|      borough|                zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [87]:
df_zones.registerTempTable('zones')

In [92]:
spark.sql("""
SELECT zone, COUNT(*)
--t.pickup_datetime, t.PULocationID, z.*
FROM 
    trips_data t 
        JOIN
    zones z
        ON t.PULocationID = z.locationid
        
GROUP BY zone
ORDER BY COUNT(*)
--limit 10;
""").show()

+--------------------+--------+
|                zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
|       Broad Channel|       8|
|     Highbridge Park|      14|
|        Battery Park|      15|
|Saint Michaels Ce...|      23|
|Breezy Point/Fort...|      25|
|Marine Park/Floyd...|      26|
|        Astoria Park|      29|
|    Inwood Hill Park|      39|
|       Willets Point|      47|
|Forest Park/Highl...|      53|
|  Brooklyn Navy Yard|      57|
|        Crotona Park|      62|
|        Country Club|      77|
|     Freshkills Park|      89|
|       Prospect Park|      98|
|     Columbia Street|     105|
|  South Williamsburg|     110|
+--------------------+--------+
only showing top 20 rows





In [None]:
#spark.stop()