In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/01 15:51:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df_green = spark.read.parquet('data/pq/green/*/*')

24/03/01 15:51:53 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
df_green.createOrReplaceTempView('green')

In [7]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [4]:
df_green_revenue = spark.sql("""
                      SELECT
                        date_trunc('hour', lpep_pickup_datetime) AS hour,
                        PULocationID AS zone,
                             
                        SUM(total_amount) AS amount,
                        COUNT(1) AS number_records
                      FROM
                        green
                      WHERE
                        lpep_pickup_datetime >= '2020-01-01 00:00:00'
                      GROUP BY
                        1, 2
                      --ORDER BY
                        --1, 2
                      """)

In [5]:
df_green_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/green', mode='overwrite')

24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:17:41 WARN MemoryManager: Total allocation exceeds 95.

In [6]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')
df_yellow.createOrReplaceTempView('yellow')

In [7]:
df_yellow_revenue = spark.sql("""
                      SELECT
                        date_trunc('hour', tpep_pickup_datetime) AS hour,
                        PULocationID AS zone,
                             
                        SUM(total_amount) AS amount,
                        COUNT(1) AS number_records
                      FROM
                        yellow
                      WHERE
                        tpep_pickup_datetime >= '2020-01-01 00:00:00'
                      GROUP BY
                        1, 2
                      --ORDER BY
                        --1, 2
                      """)

In [8]:
df_yellow_revenue \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow', mode='overwrite')

24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:17:53 WARN MemoryManager: Total allocation exceeds 95.

In [11]:
df_green_revenue_temp = df_green_revenue \
	.withColumnRenamed('amount', 'green_amount') \
	.withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_temp = df_yellow_revenue \
	.withColumnRenamed('amount', 'yellow_amount') \
	.withColumnRenamed('number_records', 'yellow_number_records')

In [15]:
df_join = df_green_revenue_temp.join(df_yellow_revenue_temp, on=['hour','zone'], how='outer')

In [16]:
df_join.show()



+-------------------+----+------------------+--------------------+------------------+---------------------+
|               hour|zone|      green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------------+--------------------+------------------+---------------------+
|2020-01-01 00:00:00|  36|295.34000000000003|                  11|            109.17|                    3|
|2020-01-01 00:00:00|  45|              NULL|                NULL| 732.4800000000002|                   42|
|2020-01-01 00:00:00|  59|50.900000000000006|                   3|              NULL|                 NULL|
|2020-01-01 00:00:00|  79|              NULL|                NULL|12573.810000000034|                  721|
|2020-01-01 00:00:00|  90|              NULL|                NULL| 5010.450000000003|                  266|
|2020-01-01 00:00:00| 114|              NULL|                NULL| 6256.430000000006|                  333|
|2020-01-01 00:00:00| 190|  

                                                                                

In [17]:
df_join.write.parquet('data/report/revenue/total')

24/02/28 19:22:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/02/28 19:22:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/02/28 19:22:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/02/28 19:22:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:22:36 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/02/28 19:22:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:22:37 WARN MemoryManager: Total allocation exceeds 95.

In [18]:
df_zones = spark.read.parquet('zones/*')

In [22]:
df_result = df_join.join(df_zones, df_join.zone == df_zones.LocationID)

In [23]:
df_result.drop('LocationID', 'zone').write.parquet('tmp/revenue-zones')

24/02/28 19:39:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/02/28 19:39:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/02/28 19:39:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/02/28 19:39:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:39:48 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/02/28 19:39:49 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/02/28 19:39:49 WARN MemoryManager: Total allocation exceeds 95.

In [5]:
df_green.rdd

MapPartitionsRDD[7] at javaToPython at NativeMethodAccessorImpl.java:0

In [6]:
df_green.take(5)

[Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), lpep_dropoff_datetime=datetime.datetime(2020, 1, 29, 14, 37, 24), store_and_fwd_flag='N', RatecodeID=1, PULocationID=97, DOLocationID=97, passenger_count=1, trip_distance=0.81, fare_amount=5.5, extra=0.0, mta_tax=0.5, tip_amount=1.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.3, total_amount=7.3, payment_type=1, trip_type=1, congestion_surcharge=0.0),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 4, 36, 44), lpep_dropoff_datetime=datetime.datetime(2020, 1, 24, 4, 46, 50), store_and_fwd_flag='N', RatecodeID=5, PULocationID=7, DOLocationID=141, passenger_count=1, trip_distance=3.02, fare_amount=40.0, extra=0.0, mta_tax=0.0, tip_amount=0.0, tolls_amount=0.0, ehail_fee=None, improvement_surcharge=0.0, total_amount=42.75, payment_type=1, trip_type=2, congestion_surcharge=2.75),
 Row(VendorID=2, lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 52), lpep_dropoff_datetime=

In [7]:
rdd = df_green \
    .select('lpep_pickup_datetime', 'PULocationID', 'total_amount') \
    .rdd

In [8]:
rdd.take(5)

                                                                                

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), PULocationID=97, total_amount=7.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 4, 36, 44), PULocationID=7, total_amount=42.75),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 23, 13, 52), PULocationID=43, total_amount=8.3),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 26, 22, 18, 3), PULocationID=260, total_amount=6.8),
 Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 4, 16, 0), PULocationID=177, total_amount=19.78)]

In [9]:
from datetime import datetime

start = datetime(year=2020, month=1, day=1)

In [10]:
def filter_outliers(row):
    return row.lpep_pickup_datetime >= start

In [11]:
rdd \
    .filter(filter_outliers) \
    .take(1)

[Row(lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), PULocationID=97, total_amount=7.3)]

In [13]:
rows = rdd.take(10)
row = rows[0]

In [16]:
def prepare_for_grouping(row):
	# key is hour and zone
	hour = row.lpep_pickup_datetime.replace(minute=0, second=0, microsecond=0)
	zone = row.PULocationID
	key = (hour, zone)

	# value is sum of amounts
	amount = row.total_amount
	count = 1
	value = (amount, count)

	return (key, value)

In [17]:
prepare_for_grouping(row)

((datetime.datetime(2020, 1, 29, 14, 0), 97), (7.3, 1))

In [18]:
rdd \
	.filter(filter_outliers) \
	.map(prepare_for_grouping) \
	.take(10)

[((datetime.datetime(2020, 1, 29, 14, 0), 97), (7.3, 1)),
 ((datetime.datetime(2020, 1, 24, 4, 0), 7), (42.75, 1)),
 ((datetime.datetime(2020, 1, 23, 13, 0), 43), (8.3, 1)),
 ((datetime.datetime(2020, 1, 26, 22, 0), 260), (6.8, 1)),
 ((datetime.datetime(2020, 1, 4, 16, 0), 177), (19.78, 1)),
 ((datetime.datetime(2020, 1, 21, 23, 0), 24), (10.56, 1)),
 ((datetime.datetime(2020, 1, 25, 19, 0), 82), (15.3, 1)),
 ((datetime.datetime(2020, 1, 24, 21, 0), 260), (7.8, 1)),
 ((datetime.datetime(2020, 1, 3, 11, 0), 130), (4.3, 1)),
 ((datetime.datetime(2020, 1, 9, 7, 0), 218), (12.4, 1))]

In [19]:
def calculate_revenue(left_value, right_value):
		# in our example the value is amount, 1
		left_amount, left_count = left_value
		right_amount, right_count = right_value

		output_amount = left_amount + right_amount
		output_count = left_count + right_count

		return (output_amount, output_count)

In [20]:
rdd \
	.filter(filter_outliers) \
	.map(prepare_for_grouping) \
	.reduceByKey(calculate_revenue) \
	.take(10)

                                                                                

[((datetime.datetime(2020, 1, 26, 11, 0), 166), (373.69000000000005, 23)),
 ((datetime.datetime(2020, 1, 30, 17, 0), 82), (503.90000000000015, 37)),
 ((datetime.datetime(2020, 1, 2, 18, 0), 244), (502.0000000000001, 24)),
 ((datetime.datetime(2020, 1, 7, 15, 0), 33), (669.7099999999999, 35)),
 ((datetime.datetime(2020, 1, 16, 12, 0), 166), (380.18000000000006, 30)),
 ((datetime.datetime(2020, 1, 26, 14, 0), 33), (295.02000000000004, 17)),
 ((datetime.datetime(2020, 1, 4, 15, 0), 97), (629.7300000000001, 46)),
 ((datetime.datetime(2020, 1, 26, 9, 0), 226), (56.769999999999996, 2)),
 ((datetime.datetime(2020, 1, 24, 6, 0), 116), (246.46000000000004, 11)),
 ((datetime.datetime(2020, 1, 3, 15, 0), 82), (522.7600000000001, 34))]

In [21]:
from collections import namedtuple

RevenueRow = namedtuple('RevenueRow', ['hour', 'zone', 'revenue', 'count'])

In [22]:
def unwrap(row):
	return RevenueRow(
	hour=row[0][0],
	zone=row[0][1],
	revenue=row[1][0],
	count=row[1][1]
	)

In [23]:
rdd \
	.filter(filter_outliers) \
	.map(prepare_for_grouping) \
	.reduceByKey(calculate_revenue) \
	.map(unwrap) \
	.toDF() \
	.show()

                                                                                

+-------------------+----+------------------+-----+
|               hour|zone|           revenue|count|
+-------------------+----+------------------+-----+
|2020-01-26 11:00:00| 166|373.69000000000005|   23|
|2020-01-30 17:00:00|  82|503.90000000000015|   37|
|2020-01-02 18:00:00| 244| 502.0000000000001|   24|
|2020-01-07 15:00:00|  33| 669.7099999999999|   35|
|2020-01-16 12:00:00| 166|380.18000000000006|   30|
|2020-01-26 14:00:00|  33|295.02000000000004|   17|
|2020-01-04 15:00:00|  97| 629.7300000000001|   46|
|2020-01-26 09:00:00| 226|56.769999999999996|    2|
|2020-01-24 06:00:00| 116|246.46000000000004|   11|
|2020-01-03 15:00:00|  82| 522.7600000000001|   34|
|2020-01-29 04:00:00|   7|             69.45|    6|
|2020-01-07 16:00:00|  97| 625.4399999999997|   34|
|2020-01-25 19:00:00| 173|              19.6|    2|
|2020-01-16 12:00:00|  74| 798.1199999999994|   64|
|2020-01-01 01:00:00|  66|326.46000000000004|   17|
|2020-01-25 15:00:00| 134|              78.7|    4|
|2020-01-24 

In [24]:
df_result = rdd \
	.filter(filter_outliers) \
	.map(prepare_for_grouping) \
	.reduceByKey(calculate_revenue) \
	.map(unwrap) \
	.toDF()

                                                                                

In [25]:
df_result.schema

StructType([StructField('hour', TimestampType(), True), StructField('zone', LongType(), True), StructField('revenue', DoubleType(), True), StructField('count', LongType(), True)])

In [26]:
from pyspark.sql import types

In [27]:
schema = types.StructType([
    types.StructField('hour', types.TimestampType(), True), 
    types.StructField('zone', types.IntegerType(), True), 
    types.StructField('revenue', types.DoubleType(), True), 
    types.StructField('count', types.IntegerType(), True)
    ])

In [28]:
df_result = rdd \
	.filter(filter_outliers) \
	.map(prepare_for_grouping) \
	.reduceByKey(calculate_revenue) \
	.map(unwrap) \
	.toDF(schema)

In [29]:
df_result.write.parquet('tmp/green-revenue')

24/03/01 17:03:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/03/01 17:03:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/03/01 17:03:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
24/03/01 17:03:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/01 17:03:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 63.33% for 12 writers
24/03/01 17:04:29 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 69.09% for 11 writers
24/03/01 17:04:29 WARN MemoryManager: Total allocation exceeds 95.

In [31]:
cols = ['VendorID', 'lpep_pickup_datetime', 'PULocationID', 'DOLocationID', 'trip_distance']

df_green \
	.select(cols) \
	.show()

+--------+--------------------+------------+------------+-------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|
+--------+--------------------+------------+------------+-------------+
|       2| 2020-01-29 14:32:00|          97|          97|         0.81|
|       2| 2020-01-24 04:36:44|           7|         141|         3.02|
|       2| 2020-01-23 13:52:00|          43|          24|         1.32|
|       2| 2020-01-26 22:18:03|         260|         260|         0.89|
|    NULL| 2020-01-04 16:00:00|         177|          49|         2.83|
|       2| 2020-01-21 23:02:04|          24|          41|         1.33|
|       2| 2020-01-25 19:09:59|          82|         145|         3.41|
|       1| 2020-01-24 21:30:37|         260|           7|          1.5|
|       2| 2020-01-03 11:37:21|         130|         130|          0.2|
|    NULL| 2020-01-09 07:06:00|         218|          38|         2.47|
|       2| 2020-01-18 18:53:01|         188|         122|       

In [32]:
duration_rdd = df_green \
    .select(cols) \
    .rdd

In [34]:
def apply_model_in_batch(partition):
	return [1]

duration_rdd.mapPartitions(apply_model_in_batch).collect()

[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In [36]:
def apply_model_in_batch(partition):
	# len doesn't work on partition
	cnt = 0
	for row in partition:
		cnt += 1
	return [cnt]

duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[447770,
 454484,
 238894,
 154607,
 154692,
 150493,
 148651,
 144966,
 135114,
 117302,
 107592,
 49952]

In [37]:
import pandas as pd

rows = duration_rdd.take(10)
pd.DataFrame(rows, columns = cols)

Unnamed: 0,VendorID,lpep_pickup_datetime,PULocationID,DOLocationID,trip_distance
0,2.0,2020-01-29 14:32:00,97,97,0.81
1,2.0,2020-01-24 04:36:44,7,141,3.02
2,2.0,2020-01-23 13:52:00,43,24,1.32
3,2.0,2020-01-26 22:18:03,260,260,0.89
4,,2020-01-04 16:00:00,177,49,2.83
5,2.0,2020-01-21 23:02:04,24,41,1.33
6,2.0,2020-01-25 19:09:59,82,145,3.41
7,1.0,2020-01-24 21:30:37,260,7,1.5
8,2.0,2020-01-03 11:37:21,130,130,0.2
9,,2020-01-09 07:06:00,218,38,2.47


In [38]:
def apply_model_in_batch(partition):
	df = pd.DataFrame(partition, columns=cols)
	cnt = len(df)
	return [cnt]

In [39]:
duration_rdd.mapPartitions(apply_model_in_batch).collect()

                                                                                

[447770,
 454484,
 238894,
 154607,
 154692,
 150493,
 148651,
 144966,
 135114,
 117302,
 107592,
 49952]

In [40]:
def model_predict(df):
	y_pred = df.trip_distance * 5
	return y_pred

In [62]:
def apply_model_in_batch(rows):
	df = pd.DataFrame(rows, columns=cols)
	predictions = model_predict(df)
	df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
	df['predicted_duration'] = predictions
	
	for row in df.itertuples():
		yield row

In [44]:
duration_rdd.mapPartitions(apply_model_in_batch).take(10)

                                                                                

[Pandas(Index=0, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-29 14:32:00'), PULocationID=97, DOLocationID=97, trip_distance=0.81, predicted_duration=4.050000000000001),
 Pandas(Index=1, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-24 04:36:44'), PULocationID=7, DOLocationID=141, trip_distance=3.02, predicted_duration=15.1),
 Pandas(Index=2, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-23 13:52:00'), PULocationID=43, DOLocationID=24, trip_distance=1.32, predicted_duration=6.6000000000000005),
 Pandas(Index=3, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-26 22:18:03'), PULocationID=260, DOLocationID=260, trip_distance=0.89, predicted_duration=4.45),
 Pandas(Index=4, VendorID=nan, lpep_pickup_datetime=Timestamp('2020-01-04 16:00:00'), PULocationID=177, DOLocationID=49, trip_distance=2.83, predicted_duration=14.15),
 Pandas(Index=5, VendorID=2.0, lpep_pickup_datetime=Timestamp('2020-01-21 23:02:04'), PULocationID=24, DOLocationID=41, trip_distance=1.33,

In [63]:
schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('lpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True), 
    types.StructField('predicted_duration', types.DoubleType(), True)
    ])
duration_rdd.mapPartitions(apply_model_in_batch).toDF().show()

[Stage 37:>                                                         (0 + 1) / 1]

+-----+--------+--------------------+------------+------------+-------------+------------------+
|Index|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|predicted_duration|
+-----+--------+--------------------+------------+------------+-------------+------------------+
|    0|     2.0|                  {}|          97|          97|         0.81| 4.050000000000001|
|    1|     2.0|                  {}|           7|         141|         3.02|              15.1|
|    2|     2.0|                  {}|          43|          24|         1.32|6.6000000000000005|
|    3|     2.0|                  {}|         260|         260|         0.89|              4.45|
|    4|     NaN|                  {}|         177|          49|         2.83|             14.15|
|    5|     2.0|                  {}|          24|          41|         1.33|              6.65|
|    6|     2.0|                  {}|          82|         145|         3.41|             17.05|
|    7|     1.0|              

                                                                                

In [79]:
# Sample data as a Pandas DataFrame
data = {
    'VendorID': [2.0, 2.0, 2.0, 2.0, None, 2.0, 2.0, 1.0, 2.0, None],
    'lpep_pickup_datetime': [
        pd.Timestamp('2020-01-29 14:32:00'), pd.Timestamp('2020-01-24 04:36:44'), pd.Timestamp('2020-01-23 13:52:00'), pd.Timestamp('2020-01-26 22:18:03'),
        pd.Timestamp('2020-01-04 16:00:00'), pd.Timestamp('2020-01-21 23:02:04'), pd.Timestamp('2020-01-25 19:09:59'), pd.Timestamp('2020-01-24 21:30:37'),
        pd.Timestamp('2020-01-03 11:37:21'), pd.Timestamp('2020-01-09 07:06:00')
    ],
    'PULocationID': [97, 7, 43, 260, 177, 24, 82, 260, 130, 218],
    'DOLocationID': [97, 141, 24, 260, 49, 41, 145, 7, 130, 38],
    'trip_distance': [0.81, 3.02, 1.32, 0.89, 2.83, 1.33, 3.41, 1.5, 0.2, 2.47],
}

# Create a Pandas DataFrame
pandas_df = pd.DataFrame(data)

# Create a schema for the Spark DataFrame
schema = types.StructType([
    types.StructField("VendorID", types.FloatType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("trip_distance", types.FloatType(), True)
])

# Convert the Pandas DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(pandas_df, schema=schema)

# Show the Spark DataFrame
spark_df.show(truncate=False)


+--------+--------------------+------------+------------+-------------+
|VendorID|lpep_pickup_datetime|PULocationID|DOLocationID|trip_distance|
+--------+--------------------+------------+------------+-------------+
|2.0     |2020-01-29 14:32:00 |97          |97          |0.81         |
|2.0     |2020-01-24 04:36:44 |7           |141         |3.02         |
|2.0     |2020-01-23 13:52:00 |43          |24          |1.32         |
|2.0     |2020-01-26 22:18:03 |260         |260         |0.89         |
|NaN     |2020-01-04 16:00:00 |177         |49          |2.83         |
|2.0     |2020-01-21 23:02:04 |24          |41          |1.33         |
|2.0     |2020-01-25 19:09:59 |82          |145         |3.41         |
|1.0     |2020-01-24 21:30:37 |260         |7           |1.5          |
|2.0     |2020-01-03 11:37:21 |130         |130         |0.2          |
|NaN     |2020-01-09 07:06:00 |218         |38          |2.47         |
+--------+--------------------+------------+------------+-------

In [80]:
spark_df.rdd.take(2)

[Row(VendorID=2.0, lpep_pickup_datetime=datetime.datetime(2020, 1, 29, 14, 32), PULocationID=97, DOLocationID=97, trip_distance=0.8100000023841858),
 Row(VendorID=2.0, lpep_pickup_datetime=datetime.datetime(2020, 1, 24, 4, 36, 44), PULocationID=7, DOLocationID=141, trip_distance=3.0199999809265137)]

In [None]:
spark_df.rdd.mapPartitions(to_date).rdd.take(1)

In [104]:
pandas_df["lpep_pickup_datetime"].dt

<pandas.core.indexes.accessors.DatetimeProperties object at 0x12846dcc0>

In [171]:
def to_date(partition):
    # print(f"{partition=}")
    # print(f"{list(partition)=}")
    df = pd.DataFrame(partition, columns=cols)
    df['lpep_pickup_datetime'] = pd.to_datetime(df['lpep_pickup_datetime'])
    yield from df.itertuples(index=False)

In [173]:
schema = types.StructType([
    types.StructField("VendorID", types.FloatType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("trip_distance", types.FloatType(), True),
    # types.StructField("blah", types.TimestampType(), True)
])

spark_df.rdd.mapPartitions(to_date).toDF(schema).show()

24/03/01 20:08:39 ERROR Executor: Exception in task 3.0 in stage 174.0 (TID 571)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    r

Py4JJavaError: An error occurred while calling o2314.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 174.0 failed 1 times, most recent failure: Lost task 3.0 in stage 174.0 (TID 571) (math-47770.lan executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/session.py", line 1459, in prepare
    verify_func(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2187, in verify
    verify_value(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2160, in verify_struct
    verifier(v)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2187, in verify
    verify_value(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2181, in verify_default
    verify_acceptable_types(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2006, in verify_acceptable_types
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `TimestampType()` can not accept object `2020-01-26 22:18:03` in type `Timestamp`.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at jdk.internal.reflect.GeneratedMethodAccessor107.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:52)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/session.py", line 1459, in prepare
    verify_func(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2187, in verify
    verify_value(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2160, in verify_struct
    verifier(v)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2187, in verify
    verify_value(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2181, in verify_default
    verify_acceptable_types(obj)
  File "/Users/jessicadesilva/Documents/data_engineering_zoomcamp/week_1/.venv/lib/python3.10/site-packages/pyspark/sql/types.py", line 2006, in verify_acceptable_types
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `TimestampType()` can not accept object `2020-01-26 22:18:03` in type `Timestamp`.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	... 1 more


24/03/02 00:36:15 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 976374 ms exceeds timeout 120000 ms
24/03/02 00:36:15 WARN SparkContext: Killing executors is not supported by current scheduler.
24/03/02 00:53:54 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$