In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master('local[*]').appName('05 DataFrame') \
.getOrCreate()

In [3]:
df_yellow = spark.read.parquet('../data/s3/yellow_tripdata_2021-01.parquet',inferSchema=True)

In [6]:
df_green = spark.read.parquet('../data/s3/green_tripdata_202*-01.parquet',inferSchema=True)

In [9]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'airport_fee']

In [10]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [13]:
set(df_yellow.columns) & set(df_green.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [14]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime','tpep_pickup_datetime')
df_green = df_green.withColumnRenamed('lpep_dropoff_datetime','tpep_dropoff_datetime')


In [16]:
df_green.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [20]:
comon_columns = []

for col in df_green.columns:
    if col in df_yellow.columns:
        comon_columns.append(col)
        

In [22]:
from pyspark.sql import functions as F

In [29]:
df_green_sel = df_green.select(comon_columns)\
    .withColumn('service_type',F.lit('green'))

In [28]:
df_yellow_sel = df_green.select(comon_columns)\
    .withColumn('service_type',F.lit('yellow'))

In [30]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [31]:
df_trips_data

DataFrame[VendorID: bigint, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: double, PULocationID: bigint, DOLocationID: bigint, passenger_count: double, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, payment_type: double, congestion_surcharge: double, service_type: string]

In [33]:
print(df_green_sel.count(),
      df_yellow_sel.count(),
      df_trips_data.count())

139013 139013 278026


In [35]:
df_trips_data.groupBy('service_type').count().show()

+------------+------+
|service_type| count|
+------------+------+
|       green|139013|
|      yellow|139013|
+------------+------+



In [37]:
df_trips_data.createGlobalTempView('df_trips')

In [39]:
spark.sql('''
    SELECT * FROM df_trips LIMIT 10
''').show()

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|       2| 2020-12-31 19:15:56|  2020-12-31 19:19:52|                 N|       1.0|          43|         151|            1.0|         1.01|        5.5|  0.5|    0.5|       0.0|        