In [1]:
import pandas as pd


In [2]:
import pandas as pd
import pyspark as ps
sc = ps.SparkContext()
from pyspark.sql import SparkSession


# Create a SparkSession
spark = SparkSession.builder.appName("MyApp").getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/30 19:31:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
locationIdToName = "./taxi+_zone_lookup.csv"
taxis = "./taxis"


# 1.

## Dataframes


In [4]:
# create a dataframe file from the csv file using pyspark
dff_locactions = spark.read.csv(locationIdToName, header=True, inferSchema=True)
dff_locactions.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [5]:
# create a dataframe file from the parquet file using pyspark
dff_taxis = spark.read.parquet(taxis)
dff_taxis.show(5)



+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-03-01 02:13:08|  2022-03-01 02:24:35|            1.0|          2.4|       1.0|                 N|          90|         209|           2|       10.0|  3.0|    0.5|       0.

## RDDs

In [6]:
rdd_taxis = dff_taxis.rdd
rdd_locations = dff_locactions.rdd

# Q1 & Q2

In [7]:
# create a view for taxis 
dff_taxis.createOrReplaceTempView("taxis")
# do the same for locations
dff_locactions.createOrReplaceTempView("locations")

In [8]:
import time
start = time.time()
# lets filter all taxis trips , that happend in march
marchTrips = spark.sql(
  """
  SELECT *
  FROM taxis 
  INNER JOIN locations ON taxis.DOLocationID = locations.LocationID
  WHERE tpep_pickup_datetime >= '2022-03-01' 
  AND tpep_pickup_datetime < '2022-04-01' and tpep_dropoff_datetime >= '2022-03-01' 
  AND tpep_dropoff_datetime < '2022-04-01'
  AND locations.Zone = "Battery Park"
  ORDER BY Tip_amount DESC
  limit 1
  """
  )

bestTipTripMarch = ((marchTrips).collect())
end = time.time()
print(bestTipTripMarch)
print('Time: ',end - start)

# filter the previus trips that 



[Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2022, 3, 17, 12, 27, 47), tpep_dropoff_datetime=datetime.datetime(2022, 3, 17, 12, 27, 58), passenger_count=1.0, trip_distance=0.0, RatecodeID=1.0, store_and_fwd_flag='N', PULocationID=12, DOLocationID=12, payment_type=1, fare_amount=2.5, extra=0.0, mta_tax=0.5, tip_amount=40.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=45.8, congestion_surcharge=2.5, airport_fee=0.0, LocationID=12, Borough='Manhattan', Zone='Battery Park', service_zone='Yellow Zone')]
Time:  1.9330899715423584


                                                                                

In [9]:
start = time.time()
highestTollFees = spark.sql(
  """
  SELECT MAX(Tolls_amount) as max_toll_fees_per_month, MONTH(tpep_pickup_datetime) as month, 
  MAX(tpep_pickup_datetime) as max_date
  FROM taxis
  where Tolls_amount > 0
  AND YEAR(tpep_pickup_datetime) = '2022'
  GROUP BY MONTH(tpep_pickup_datetime)
  """
  )
highestTollFees = ((highestTollFees).collect())
end = time.time()
print(highestTollFees)
print('Time: ',end - start)



[Row(max_toll_fees_per_month=235.7, month=3, max_date=datetime.datetime(2022, 3, 31, 23, 59, 50)), Row(max_toll_fees_per_month=911.87, month=4, max_date=datetime.datetime(2022, 4, 30, 23, 59, 39)), Row(max_toll_fees_per_month=95.0, month=2, max_date=datetime.datetime(2022, 2, 28, 23, 59, 55)), Row(max_toll_fees_per_month=800.09, month=6, max_date=datetime.datetime(2022, 6, 30, 23, 59, 52)), Row(max_toll_fees_per_month=29.55, month=7, max_date=datetime.datetime(2022, 7, 1, 2, 59, 49)), Row(max_toll_fees_per_month=813.75, month=5, max_date=datetime.datetime(2022, 5, 31, 23, 59, 55)), Row(max_toll_fees_per_month=193.3, month=1, max_date=datetime.datetime(2022, 1, 31, 23, 59, 48))]
Time:  1.135735034942627


                                                                                

# Q3

In [10]:
# group by every 15 days and find the average trip distance and the average trip cost
start = time.time()
avgTripDistanceAndCost = spark.sql(
    """
    SELECT AVG(Trip_distance) as avg_trip_distance, AVG(Total_amount) as avg_trip_cost,
    CASE 
        WHEN 
            EXTRACT(day FROM tpep_pickup_datetime) BETWEEN 1 AND 15 
            THEN DATE_TRUNC('month', tpep_pickup_datetime)
        ELSE DATE_ADD(DATE_TRUNC('month', tpep_pickup_datetime), 15)
    END AS period
    FROM taxis
    WHERE tpep_pickup_datetime >= '2022-01-01'
    AND tpep_pickup_datetime < '2022-12-31'
    AND PULocationID != DOLocationID
    GROUP BY period
    ORDER BY period DESC
    """
    )
avgTripDistanceAndCost = ((avgTripDistanceAndCost).collect())
end = time.time()
print(avgTripDistanceAndCost)
print('Time: ',end - start)

                                                                                

[Row(avg_trip_distance=3.924531002558172, avg_trip_cost=21.80540260689235, period=datetime.datetime(2022, 7, 1, 0, 0)), Row(avg_trip_distance=6.174138574511356, avg_trip_cost=22.331380641103525, period=datetime.datetime(2022, 6, 16, 0, 0)), Row(avg_trip_distance=6.315157336730177, avg_trip_cost=22.466305309343248, period=datetime.datetime(2022, 6, 1, 0, 0)), Row(avg_trip_distance=7.906694182348757, avg_trip_cost=22.771948777963715, period=datetime.datetime(2022, 5, 16, 0, 0)), Row(avg_trip_distance=6.249697852127242, avg_trip_cost=21.921570348909114, period=datetime.datetime(2022, 5, 1, 0, 0)), Row(avg_trip_distance=5.800344707645977, avg_trip_cost=21.428088376232783, period=datetime.datetime(2022, 4, 16, 0, 0)), Row(avg_trip_distance=5.679323077938295, avg_trip_cost=21.515559094583583, period=datetime.datetime(2022, 4, 1, 0, 0)), Row(avg_trip_distance=5.5569449358506535, avg_trip_cost=21.120920554171548, period=datetime.datetime(2022, 3, 16, 0, 0)), Row(avg_trip_distance=6.48048543405

In [11]:

display(rdd_locations.takeSample(False, 5, 1))
print('--------------')
display(rdd_taxis.take(1))

[Row(LocationID=247, Borough='Bronx', Zone='West Concourse', service_zone='Boro Zone'),
 Row(LocationID=248, Borough='Bronx', Zone='West Farms/Bronx River', service_zone='Boro Zone'),
 Row(LocationID=145, Borough='Queens', Zone='Long Island City/Hunters Point', service_zone='Boro Zone'),
 Row(LocationID=115, Borough='Staten Island', Zone='Grymes Hill/Clifton', service_zone='Boro Zone'),
 Row(LocationID=238, Borough='Manhattan', Zone='Upper West Side North', service_zone='Yellow Zone')]

--------------


[Row(VendorID=1, tpep_pickup_datetime=datetime.datetime(2022, 3, 1, 2, 13, 8), tpep_dropoff_datetime=datetime.datetime(2022, 3, 1, 2, 24, 35), passenger_count=1.0, trip_distance=2.4, RatecodeID=1.0, store_and_fwd_flag='N', PULocationID=90, DOLocationID=209, payment_type=2, fare_amount=10.0, extra=3.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=0.3, total_amount=13.8, congestion_surcharge=2.5, airport_fee=0.0)]

In [12]:
from datetime import datetime 
def get_period(dt):
    day = dt.day
    if day >= 1 and day <= 15:
        return datetime(dt.year, dt.month, 1)
    else:
        return datetime(dt.year, dt.month, 16)
def handleGroupValues(x,y):
    return (x[0]+y[0], x[1]+y[1], x[2]+y[2])

In [13]:
# do the same but in rdd format
start = time.time()
avgTripDistanceAndCostRdd_step1 = \
    rdd_taxis \
    .filter(lambda x: x.PULocationID != x.DOLocationID \
        and  x.tpep_dropoff_datetime.strftime("%Y-%m-%d") >= "2022-01-01" \
        and  x.tpep_dropoff_datetime.strftime("%Y-%m-%d") <= "2022-12-31" ) 
print('step 1 finished')
avgTripDistanceAndCostRdd_step2 = \
    avgTripDistanceAndCostRdd_step1 \
    .map(lambda x: (get_period(x.tpep_pickup_datetime), (x.trip_distance, x.total_amount, 1)))
print('step 2 finished')
avgTripDistanceAndCostRdd_step3 = avgTripDistanceAndCostRdd_step2 \
    .reduceByKey(handleGroupValues)
print('step 3 finished')
print
avgTripDistanceAndCostRdd4 = avgTripDistanceAndCostRdd_step3 \
    .mapValues(lambda x: (x[0]/x[2], x[1]/x[2])).cache()
print('step 4 finished')
avgTripDistanceAndCostRdd = avgTripDistanceAndCostRdd4 \
    .take(1)
    # (a, (b, c, d))
    # a - period - key
    # b,c,d - values - x
    # x[0] + y[0] - sum of trip distances
    # x[1] + y[1] - sum of trip costs
    # x[2] + y[2] - number of trips
    # x[0]/x[2] - average trip distance
    # x[1]/x[2] - average trip cost
end = time.time()
print('rdd finished')
print(avgTripDistanceAndCostRdd)
print('Time: ',end - start)

step 1 finished
step 2 finished
step 3 finished
step 4 finished


ERROR:root:KeyboardInterrupt while sending command.                 (2 + 6) / 8]
Traceback (most recent call last):
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/homebrew/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/homebrew/Cellar/python@3.10/3.10.9/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt

KeyboardInterrupt: 

In [18]:
from operator import add

def average(x, y):
    return (x[0] + y[0], x[1] + y[1])

def get_average(x):
    return x[0] / x[1]

rdd = sc.parallelize([("a", 2), ("a", 3), ("b", 4), ("b", 5), ("b", 6)])

# Calculate the sum and count of values for each key
sum_count_rdd = rdd.mapValues(lambda x: (x, 1)).reduceByKey(average)

# Calculate the average for each key
average_rdd = sum_count_rdd.mapValues(get_average)

# Output: [("a", 2.5), ("b", 5.0)]
print(average_rdd.collect())

[('a', 2.5), ('b', 5.0)]
