In [1]:
import os

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

23/06/16 17:37:09 WARN Utils: Your hostname, jose-MacBookPro resolves to a loopback address: 127.0.1.1; using 192.168.1.224 instead (on interface wlp2s0)
23/06/16 17:37:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/06/16 17:37:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [16]:
df_g = spark.read.parquet('data/pq/green/*/*')
df_g = df_g \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

In [17]:
df_g.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [13]:
df_y = spark.read.parquet('data/pq/yellow/*/*')
df_y = df_y \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

In [14]:
df_y.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [23]:
common_cols = []

y_cols = set(df_y.columns)

for gcol in df_g.columns:
    if gcol in y_cols:
        common_cols.append(gcol)
        
print(common_cols)

['VendorID', 'pickup_datetime', 'dropoff_datetime', 'store_and_fwd_flag', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'payment_type', 'congestion_surcharge']


In [31]:
dfg_common = df_g \
    .select(common_cols) \
    .withColumn("service_type", F.lit("green"))
    
dfy_common = df_y \
    .select(common_cols) \
    .withColumn("service_type", F.lit("yellow"))

trips_data = dfg_common.unionAll(dfy_common)
trips_data.createOrReplaceTempView('trips_data_all')

In [32]:
trips_data.groupBy("service_type").count().show()

                                                                                

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



In [35]:
spark.sql("""
SELECT
    service_type,
    COUNT(*) AS row_count
FROM trips_data_all
GROUP BY service_type
""").show()



+------------+---------+
|service_type|row_count|
+------------+---------+
|       green|  2304517|
|      yellow| 39649199|
+------------+---------+





In [37]:
df_result = spark.sql("""
    SELECT
    -- revenue grouping
    PULocationID AS rev_zone,
        date_trunc('month', pickup_datetime) AS revenue_month,
        service_type,

        -- revenue aggs
        SUM(fare_amount) AS revenue_monthly_fee,
        SUM(extra) AS revenue_monthly_extra,
        SUM(mta_tax) AS revenue_monthly_mta_tax,
        SUM(tip_amount) AS revenue_monthly_tip_amount,
        SUM(tolls_amount) AS revenue_monthly_tolls_amount,
        SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
        SUM(total_amount) AS revenue_monthly_total_amount,
        SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

        -- other calculations
        AVG(passenger_count) AS avg_psg_count,
        AVG(trip_distance) AS avg_trip_dist
    FROM 
        trips_data_all
    GROUP BY
        1, 2, 3;
""")

### Writing to a "Data Lake"
Below is how you might write back to a data lake. If you wanted just one big file, you could use the code below:
```python
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')
```

In [39]:
# this is essentially where you would write back to a data lake
# we wouldn't necessarily write things to a data warehouse from here
df_result.write.parquet('data/report/revenue')

                                                                                

Select fields that are in both datasets

In [21]:
# only the columns in both tables
set(df_g.columns) & set(df_y.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}