In [13]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/03 14:36:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
spark.version

'3.3.2'

In [3]:
schema = StructType([
  StructField('dispatching_base_num', StringType(), True),
  StructField('pickup_datetime', TimestampType(), True),
  StructField('dropOff_datetime', TimestampType(), True),
  StructField('PUlocationID', IntegerType(), True),
  StructField('DOlocationID', IntegerType(), True),
  StructField('SR_Flag', StringType(), True),
  StructField('Affiliated_base_number', StringType(), True)
])

In [5]:
df = spark.read.option("compression", "gzip").option("header", True).schema(schema).csv("data/raw/fvh")

In [6]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [9]:
df.repartition(6) \
    .write \
    .parquet("data//raw/fvh/pq/")

                                                                                

In [11]:
columns = df.columns

In [35]:
number_of_trips = df.filter(to_date('pickup_datetime', 'YYYY-MM-dd') == '2019-10-15')

In [39]:
number_of_trips.count()

                                                                                

62610

In [56]:
df_trip_duration = df.select(columns).withColumn("trip_duration",unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime"))

In [86]:
longest_trip = df_trip_duration.select(max("trip_duration")).first()[0]

                                                                                

In [87]:
longest_trip/3600

631152.5

In [88]:
zone_df = spark.read.option("inferSchema", "true").option("header", "true").csv("data/zone/")

In [89]:
zone_df.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [92]:
zone_df.createOrReplaceTempView('zone')
df.createOrReplaceTempView('fvh')

In [101]:
df_result = spark.sql("""
    
    SELECT Zone, Count(*) as pickup_zone
    FROM zone INNER JOIN fvh ON LocationID = fvh.PULocationID
    Group by Zone
    ORDER BY pickup_zone
    LIMIT 1
    
    """
    
)

In [102]:
df_result.show()

[Stage 87:>                                                         (0 + 1) / 1]

+-----------+-----------+
|       Zone|pickup_zone|
+-----------+-----------+
|Jamaica Bay|          1|
+-----------+-----------+



                                                                                