뉴욕 택시 데이터 활용
- `fhvhv_tripdata_2020-03.csv` : 2020년 3월 뉴욕 택시 데이터
    - `hvfhs_license_num` : 택시회사 ID(뉴욕 택시, 우버 등등)
    - `dispatching_base_num` : 택시 회사별 고유 택시 ID
    - `pickup_datetime` :  승객을 태운 시간
    - `dropoff_datetime` :  승객이 하차한 시간
    - `PULocationID` : 승객이 승차한 지역 ID
    - `DOLocationID` : 승객이 하차한 지역 ID

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()

In [2]:
directory = "/home/lab26/SparkCourse/data"
filename = "fhvhv_tripdata_2020-03.csv"

In [3]:
datas = spark.read.csv(f"file:///{directory}/{filename}", inferSchema = True, header=True)
datas.show(5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+

In [4]:
datas.createOrReplaceTempView("mobility_data")

In [8]:
query="""
select *
from mobility_data
limit 5
"""

spark.sql(query).show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2020-03-01 00:03:40|2020-03-01 00:23:39|          81|         159|   null|
|           HV0005|              B02510|2020-03-01 00:28:05|2020-03-01 00:38:57|         168|         119|   null|
|           HV0003|              B02764|2020-03-01 00:03:07|2020-03-01 00:15:04|         137|         209|      1|
|           HV0003|              B02764|2020-03-01 00:18:42|2020-03-01 00:38:42|         209|          80|   null|
|           HV0003|              B02764|2020-03-01 00:44:24|2020-03-01 00:58:44|         256|         226|   null|
+-----------------+--------------------+-------------------+-------------------+

In [12]:
# 승차 년-월-일 별로 카운트 세기
query="""
select pickup_date, count(*) as trips

from (select split(pickup_datetime,' ')[0] as pickup_date
      from mobility_data)

group by pickup_date
"""

spark.sql(query).show()

+-----------+------+
|pickup_date| trips|
+-----------+------+
| 2020-03-03|697880|
| 2020-03-06|872012|
| 2020-03-05|731165|
| 2020-03-02|648986|
| 2020-03-04|707879|
| 2020-03-10|592727|
| 2020-03-01|784246|
| 2020-03-09|628940|
| 2020-03-08|731222|
| 2020-03-07|886071|
+-----------+------+



In [13]:
# 실행계획 살펴보기
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Aggregate ['pickup_date], ['pickup_date, 'count(1) AS trips#218]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Project ['split('pickup_datetime,  )[0] AS pickup_date#217]
      +- 'UnresolvedRelation [mobility_data], [], false

== Analyzed Logical Plan ==
pickup_date: string, trips: bigint
Aggregate [pickup_date#217], [pickup_date#217, count(1) AS trips#218L]
+- SubqueryAlias __auto_generated_subquery_name
   +- Project [split(pickup_datetime#18,  , -1)[0] AS pickup_date#217]
      +- SubqueryAlias mobility_data
         +- Relation[hvfhs_license_num#16,dispatching_base_num#17,pickup_datetime#18,dropoff_datetime#19,PULocationID#20,DOLocationID#21,SR_Flag#22] csv

== Optimized Logical Plan ==
Aggregate [pickup_date#217], [pickup_date#217, count(1) AS trips#218L]
+- Project [split(pickup_datetime#18,  , -1)[0] AS pickup_date#217]
   +- Relation[hvfhs_license_num#16,dispatching_base_num#17,pickup_datetime#18,dropoff_datetime#19,PULocationID#20,DOLocati

In [14]:
zone_file = "taxi+_zone_lookup.csv"

In [15]:
zone_data = spark.read.csv(f"file:///{directory}/{zone_file}", inferSchema = True, header=True)
zone_data.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [16]:
zone_data.createOrReplaceTempView("zone_data")

In [18]:
query="""
select borough, count(*) as trips
from (select zone_data.Borough as borough
      from mobility_data
      join zone_data on mobility_data.PULocationID = zone_data.LocationID)
group by borough
"""

spark.sql(query).show()

+-------------+-------+
|      borough|  trips|
+-------------+-------+
|       Queens|1284141|
|          EWR|    239|
|      Unknown|    452|
|     Brooklyn|1947344|
|Staten Island|  90582|
|    Manhattan|3005030|
|        Bronx| 953340|
+-------------+-------+



In [19]:
# 택시회사 하나만 지정해서 보기
query="""
select borough, count(*) as trips
from (select zone_data.Borough as borough
      from mobility_data
      join zone_data on mobility_data.PULocationID = zone_data.LocationID
      where mobility_data.hvfhs_license_num='HV0003')
      
group by borough
"""

spark.sql(query).show()

+-------------+-------+
|      borough|  trips|
+-------------+-------+
|       Queens| 984782|
|          EWR|      2|
|      Unknown|    319|
|     Brooklyn|1451704|
|Staten Island|  76464|
|    Manhattan|1977260|
|        Bronx| 813495|
+-------------+-------+



In [20]:
# 실행계획 살펴보기
spark.sql(query).explain(True)

== Parsed Logical Plan ==
'Aggregate ['borough], ['borough, 'count(1) AS trips#319]
+- 'SubqueryAlias __auto_generated_subquery_name
   +- 'Project ['zone_data.Borough AS borough#318]
      +- 'Filter ('mobility_data.hvfhs_license_num = HV0003)
         +- 'Join Inner, ('mobility_data.PULocationID = 'zone_data.LocationID)
            :- 'UnresolvedRelation [mobility_data], [], false
            +- 'UnresolvedRelation [zone_data], [], false

== Analyzed Logical Plan ==
borough: string, trips: bigint
Aggregate [borough#318], [borough#318, count(1) AS trips#319L]
+- SubqueryAlias __auto_generated_subquery_name
   +- Project [Borough#241 AS borough#318]
      +- Filter (hvfhs_license_num#16 = HV0003)
         +- Join Inner, (PULocationID#20 = LocationID#240)
            :- SubqueryAlias mobility_data
            :  +- Relation[hvfhs_license_num#16,dispatching_base_num#17,pickup_datetime#18,dropoff_datetime#19,PULocationID#20,DOLocationID#21,SR_Flag#22] csv
            +- SubqueryAlias zone

In [21]:
spark.stop()