In [1]:
import pyspark
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("spark://de-zoomcamp.us-central1-c.c.shareprompt-412612.internal:7077") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/01 15:00:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/01 15:00:42 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
spark

In [6]:
df_green = spark.read.parquet("data/raw/green/*/*")

                                                                                

In [7]:
df_green = df_green.withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast("timestamp"))

In [8]:
df_green.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [9]:
df_green.registerTempTable('trips_data')



In [10]:
df_green_result = spark.sql("""
SELECT
    -- Revenue grouping 
    PULocationID AS zone,
    date_trunc("hour", lpep_pickup_datetime) AS hour,

    -- Revenue calculation 
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records

FROM
    trips_data
GROUP BY 1,2
ORDER BY 1,2
""")

In [11]:
df_green_result.show()

                                                                                

+----+-------------------+------+--------------+
|zone|               hour|amount|number_records|
+----+-------------------+------+--------------+
|   1|2020-01-01 03:00:00| 155.3|             1|
|   1|2020-01-03 05:00:00|117.39|             1|
|   1|2020-01-26 10:00:00| 85.56|             1|
|   1|2020-01-29 18:00:00|108.36|             1|
|   1|2020-02-01 06:00:00|115.56|             1|
|   1|2020-02-06 07:00:00|  98.3|             1|
|   1|2020-02-16 07:00:00|  95.3|             1|
|   1|2020-02-17 13:00:00|  82.3|             1|
|   1|2020-02-26 17:00:00| 49.38|             1|
|   1|2020-03-02 15:00:00|  88.3|             1|
|   1|2020-03-11 12:00:00|103.56|             1|
|   1|2020-03-12 11:00:00|  61.1|             1|
|   1|2020-03-14 11:00:00|  16.0|             1|
|   1|2020-03-21 17:00:00| 100.3|             1|
|   1|2020-04-10 13:00:00| 150.3|             1|
|   1|2020-04-23 08:00:00| 37.85|             1|
|   1|2020-07-12 07:00:00| 79.05|             1|
|   1|2020-07-15 06:

In [12]:
df_green_result \
    .repartition(20) \
    .write.parquet('data/report/revenue/green' , mode='overwrite')

                                                                                

In [13]:
print(spark.sparkContext.uiWebUrl)


http://de-zoomcamp.us-central1-c.c.shareprompt-412612.internal:4041


In [14]:
df_yellow = spark.read.parquet("data/raw/yellow/*/*")

In [15]:
df_yellow.registerTempTable('yellow_trips_data')

In [16]:
df_yellow_result = spark.sql("""
SELECT
    -- Revenue grouping 
    PULocationID AS zone,
    date_trunc("hour", tpep_pickup_datetime) AS hour,

    -- Revenue calculation 
    SUM(total_amount) AS amount,
    COUNT(1) AS number_records

FROM
    yellow_trips_data
GROUP BY 1,2
""")

In [17]:
df_yellow_result \
    .repartition(20) \
    .write.parquet('data/report/revenue/yellow',mode='overwrite' )

                                                                                

In [18]:
df_green_result=spark.read.parquet('data/report/revenue/green')

In [19]:
df_yellow_result=spark.read.parquet('data/report/revenue/yellow')

In [20]:
df_green_revenue_tmp=df_green_result \
    .withColumnRenamed('amount','green_amount') \
    .withColumnRenamed('number_records','green_number_records')

In [21]:
df_yellow_revenue_tmp=df_yellow_result \
    .withColumnRenamed('amount','yellow_amount') \
    .withColumnRenamed('number_records','yellow_number_records')

In [22]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=["hour", "zone"] , how="outer")

In [23]:
df_join.show()

[Stage 29:>                                                         (0 + 1) / 1]

+-------------------+----+------------+--------------------+------------------+---------------------+
|               hour|zone|green_amount|green_number_records|     yellow_amount|yellow_number_records|
+-------------------+----+------------+--------------------+------------------+---------------------+
|2003-01-01 00:00:00| 193|        NULL|                NULL|               0.0|                    1|
|2008-12-31 19:00:00|  74|       18.05|                   1|              NULL|                 NULL|
|2008-12-31 23:00:00|  43|        NULL|                NULL|             79.66|                    4|
|2008-12-31 23:00:00| 116|        NULL|                NULL|              29.3|                    1|
|2008-12-31 23:00:00| 141|        NULL|                NULL|              23.6|                    2|
|2008-12-31 23:00:00| 142|        NULL|                NULL|37.760000000000005|                    3|
|2008-12-31 23:00:00| 163|        NULL|                NULL|               9.3|   

                                                                                

In [24]:
df_join.write.parquet('data/report/revenue/total', mode='overwrite')

                                                                                

In [25]:
print(spark.sparkContext.uiWebUrl)


http://de-zoomcamp.us-central1-c.c.shareprompt-412612.internal:4041
