In [0]:
# Azure storage access info
blob_account_name = "azureopendatastorage"
blob_container_name = "nyctlc"
blob_relative_path = "yellow"
blob_sas_token = "r"

# Allow SPARK to read from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set(
  'fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name),
  blob_sas_token)
print('Remote blob path: ' + wasbs_path)

# SPARK read parquet, note that it won't load any data yet by now
yellowtaxis = spark.read.parquet(wasbs_path)
print('Register the DataFrame as a SQL temporary view: source')
yellowtaxis.createOrReplaceTempView('source')

# Display top 10 rows
print('Displaying top 10 rows: ')
display(spark.sql('SELECT * FROM source LIMIT 10'))

Remote blob path: wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellow
Register the DataFrame as a SQL temporary view: source
Displaying top 10 rows: 


vendorID,tpepPickupDateTime,tpepDropoffDateTime,passengerCount,tripDistance,puLocationId,doLocationId,startLon,startLat,endLon,endLat,rateCodeId,storeAndFwdFlag,paymentType,fareAmount,extra,mtaTax,improvementSurcharge,tipAmount,tollsAmount,totalAmount,puYear,puMonth
CMT,2012-02-29T23:53:14.000+0000,2012-03-01T00:00:43.000+0000,1,2.1,,,-73.980494,40.730601,-73.983532,40.752311,1,N,CSH,7.3,0.5,0.5,,0.0,0.0,8.3,2012,3
VTS,2012-03-17T08:01:00.000+0000,2012-03-17T08:15:00.000+0000,1,11.06,,,-73.986067,40.699862,-73.814838,40.737052,1,,CRD,24.5,0.0,0.5,,4.9,0.0,29.9,2012,3
CMT,2012-02-29T23:58:51.000+0000,2012-03-01T00:15:48.000+0000,1,3.4,,,-73.968967,40.754359,-73.957048,40.743289,1,N,CRD,12.5,0.5,0.5,,1.5,0.0,15.0,2012,3
CMT,2012-03-01T19:24:16.000+0000,2012-03-01T19:31:22.000+0000,1,1.3,,,-73.99374,40.75307,-74.005428,40.741118,1,N,CRD,6.1,1.0,0.5,,0.0,0.0,7.6,2012,3
CMT,2012-02-29T23:46:32.000+0000,2012-03-01T00:05:18.000+0000,3,2.0,,,-73.973723,40.752323,-73.948275,40.769413,1,N,CSH,11.7,0.5,0.5,,0.0,0.0,12.7,2012,3
VTS,2012-03-07T15:17:00.000+0000,2012-03-07T15:26:00.000+0000,5,1.87,,,-73.988237,40.75929,-73.97114,40.78275,1,,CSH,7.7,0.0,0.5,,0.0,0.0,8.2,2012,3
CMT,2012-02-29T23:41:58.000+0000,2012-03-01T00:02:29.000+0000,1,12.4,,,-73.954536,40.727742,-73.768994,40.760246,1,N,CSH,28.5,0.5,0.5,,0.0,0.0,29.5,2012,3
VTS,2012-03-18T15:21:00.000+0000,2012-03-18T15:32:00.000+0000,6,2.51,,,-74.001705,40.732345,-73.974888,40.750835,1,,CSH,8.9,0.0,0.5,,0.0,0.0,9.4,2012,3
CMT,2012-02-29T23:47:08.000+0000,2012-03-01T00:06:42.000+0000,4,6.3,,,-73.992319,40.724503,-73.923589,40.76113,1,N,CRD,16.5,0.5,0.5,,4.37,0.0,21.87,2012,3
VTS,2012-03-13T22:26:00.000+0000,2012-03-13T22:37:00.000+0000,1,1.34,,,-74.009907,40.706292,-74.000512,40.71733,1,,CSH,7.3,0.5,0.5,,0.0,0.0,8.3,2012,3


In [0]:
# File location and type
file_location = "/FileStore/tables/taxi_zone_lookup-3.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
lookupexample = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(lookupexample)

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


In [0]:
import pandas 
import numpy

In [0]:
sc

In [0]:
#### DO NOT CHANGE ANYTHING IN THIS CELL ####

from pyspark.sql.functions import col

def load_data():
    trips = yellowtaxis
    
    print("Trip Count: ", trips.count())
    lookup = lookupexample
    return trips, lookup

def main():
    # Runs your functions implemented above.
    
    print(user())
    trips, lookup = load_data()
    trips = long_trips(trips)
    mtrips = manhattan_trips(trips, lookup)
    wp = weighted_profit(trips, mtrips)
    final = final_output(wp,lookup)
    
    # Outputs the results for you to visually see
    final.show()
    
    # Writes out as a CSV to your bucket.
    #final.write.csv(bucket)

In [0]:
def user():
    # Returns a string consisting of your username.
    return 'Rafael Martínez Alarcón'

In [0]:
def long_trips(trips):
    # Returns a Dataframe with Schema the same as :trips:
    filtered_trips = trips.filter(col("tripDistance") >= 2)
    return filtered_trips

In [0]:
from pyspark.sql import functions as F
def manhattan_trips(trips, lookup):
    # Returns a Dataframe with Schema: DOLocationID, pcount
    manhattan_locations = lookup.filter(F.col("Borough") == "Manhattan").select("LocationID")
    manhattan_trips = trips.join(manhattan_locations, trips["doLocationId"] == manhattan_locations["LocationID"], how="inner")
    result = manhattan_trips.groupBy("doLocationId").agg(F.count("*").alias("pcount"))
    result = result.orderBy("pcount", ascending=False).limit(20).cache()
    return result

In [0]:
from pyspark.sql import functions as F
def weighted_profit(trips, mtrips): 
    avg_total_amount = trips.groupBy("puLocationId").agg(F.avg("totalAmount").alias("avg_total_amount"))
    total_trip_count = trips.groupBy("puLocationId").agg(F.count("*").alias("total_trip_count"))
    trips_with_popular_dropoffs = trips.join(mtrips, trips["doLocationId"] == mtrips["doLocationId"], "inner")
    count_popular_dropoffs = trips_with_popular_dropoffs.groupBy("puLocationId").agg(F.count("*").alias("count_popular_dropoffs"))
    weighted_profit_df = avg_total_amount \
        .join(total_trip_count, "puLocationId", "inner") \
        .join(count_popular_dropoffs, "puLocationId", "left")
    
    weighted_profit_df = weighted_profit_df.withColumn("proportion", 
                                                      F.col("count_popular_dropoffs") / F.col("total_trip_count"))
    
    weighted_profit_df = weighted_profit_df.withColumn("weighted_profit", 
                                                      F.col("avg_total_amount") * F.col("proportion"))
    
    weighted_profit_df = weighted_profit_df.select("puLocationId", "weighted_profit")
    
    return weighted_profit_df

In [0]:
def final_output(wp, lookup): 
    # Returns a Dataframe with Schema: Zone, Borough, weighted_profit
    joined_df = wp.join(lookup, wp["puLocationId"] == lookup["LocationID"], how="inner")
    result = joined_df.groupBy("Zone", "Borough").agg(
        F.sum("weighted_profit").alias("weighted_income")
    )
    top_20 = result.orderBy(F.desc("weighted_income")).limit(20)
    
    return top_20

In [0]:
main() 


Rafael Martínez Alarcón
Trip Count:  1571671152
+--------------------+---------+------------------+
|                Zone|  Borough|   weighted_income|
+--------------------+---------+------------------+
|        Baisley Park|   Queens|31.205081273352373|
|       South Jamaica|   Queens| 29.71256744736692|
|Flushing Meadows-...|   Queens|23.662750316881112|
|     Randalls Island|Manhattan|22.793999713582004|
|             Jamaica|   Queens| 21.42248840989114|
|Springfield Garde...|   Queens|21.363622500312715|
|Briarwood/Jamaica...|   Queens|19.011793788951703|
|   LaGuardia Airport|   Queens|18.580626766045494|
|              Corona|   Queens| 18.36363783456637|
|         JFK Airport|   Queens| 18.04650717378282|
|        Astoria Park|   Queens|17.088652456501677|
|         Jamaica Bay|   Queens|15.050362596914402|
|             Maspeth|   Queens|13.049770868392038|
| Morningside Heights|Manhattan|  12.0643425344128|
|   Battery Park City|Manhattan|11.922662352806961|
|        Battery