### 파일 불러오기

In [1]:
%pyspark
spark.stop()


In [2]:
%pyspark

# 스파크 세션 생성
from pyspark.sql import SparkSession

# 인스턴스 생성(Max Memory 지정: Out of Memory 방지)
MAX_MEMORY="4g"
spark = SparkSession.builder.appName("taxi-fare")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .config("spark.memory.useLegacyMode", "true")\
                .config("spark.executor.instances", 2)\
                .getOrCreate()
                
sc = spark.sparkContext  

# 메모리 상태 확인
storage_info = sc.getConf().getAll()

for item in storage_info:
    print(item)


### 일일 시간대별 택시 선호도 

In [4]:
%pyspark
spark.catalog.clearCache()


In [5]:
%pyspark

#yellow_df = spark.read.parquet("/home/kim/data/NY_taxi/tripdata/yellow_tripdata/2021/yellow_tripdata_2021-01.parquet", inferSchema=True, header=True) # 여러개 파일 동시에 가져올 수 있다.
yellow_df = spark.read.parquet("/home/kim/data/NY_taxi/tripdata/yellow_tripdata/2021", inferSchema=True, header=True) # 여러개 파일 동시에 가져올 수 있다.

zone_df = spark.read.csv("/home/kim/data/NY_taxi/taxi_zone_lookup.csv", inferSchema=True, header=True)

weather_df = spark.read.csv("/home/kim/data/NYC_Weather_2016_2022.csv", inferSchema=True, header=True)


yellow_df.createOrReplaceTempView("yellow")
zone_df.createOrReplaceTempView("zone")

weather_df.createOrReplaceTempView("weather")
weather_df.select("cloudcover (%)").where(weather_df['time'] == "2021-01-02 06:00:00").show()


In [6]:
%pyspark
yellow_df.cache()
weather_df.cache()

In [7]:
%pyspark

query = """
SELECT 
    y.VendorID as vendor_id,
    TO_DATE(y.tpep_pickup_datetime) as pickup_date,
    TO_DATE(y.tpep_dropoff_datetime) as dropoff_date,
    HOUR(y.tpep_pickup_datetime) as pickup_time,
    HOUR(y.tpep_dropoff_datetime) as dropoff_time,
    y.passenger_count,
    y.trip_distance,
    y.fare_amount,
    y.tip_amount,
    y.tolls_amount,
    y.total_amount,
    y.payment_type,
    pz.Zone as pickup_zone,
    dz.Zone as dropoff_zone
FROM 
    yellow y
    LEFT JOIN 
        zone pz
    ON
        y.PULocationID = pz.LocationID
    LEFT JOIN
        zone dz
    ON 
        y.DOLocationID = dz.LocationID
"""
taxicomb_df = spark.sql(query)
taxicomb_df.unpersist()

# 새로운 TempView에 담는다.
taxicomb_df.createOrReplaceTempView("taxicomb")


In [8]:
%pyspark
taxicomb_df.cache()



### 날씨 데이터 전처리하기 



In [10]:
%pyspark

query = """
SELECT
    TO_DATE(w.time) as date,
    HOUR(w.time) as hour,
    w.`temperature_2m (°C)` as temperature,
    w.`rain (mm)` as rain,
    w.`cloudcover (%)` as cloudcover
FROM 
    weather w
WHERE TO_DATE(w.time) > '2021-01-01' AND TO_DATE(w.time) < '2021-02-01'
"""
weather_df= spark.sql(query)
weather_df.createOrReplaceTempView("weather")


weather_df.show()

### weather랑 join하기 

In [12]:
%pyspark
import time
from pyspark import StorageLevel
from pyspark.sql import functions as F

query = """
SELECT
    *
FROM 
    taxicomb tc
INNER JOIN
    weather w
ON
    tc.pickup_time = w.hour AND tc.pickup_date = w.date
    
"""
comb_df= spark.sql(query)


#comb_df.persist(StorageLevel.DISK_ONLY)
comb_df.cache()
#comb_df.unpersist()


In [13]:
%pyspark

start = time.time()

comb_df.show()

end = time.time()
ㄴ
elapsed_time = end - start
print(f"경과 시간: {elapsed_time:.2f}초")

# 캐싱 여부 확인
print(f"comb_df is cached? : {comb_df.is_cached}")  # 캐싱 여부 및 저장 수준 확인


In [14]:
%pyspark
start = time.time()


# 시간대별, 픽업 존별로 데이터를 그룹화
#grouped_df = comb_df.groupBy("pickup_time").agg(F.sum("pickup_count").alias("total_count"))
grouped_df = comb_df.groupBy("pickup_time").agg(
    F.count("*").alias("trip_count"),        # 운행 횟수
    F.sum("total_amount").alias("total_fare"),  # 총 수입
    F.avg("tip_amount").alias("avg_tip"),     # 평균 팁
    F.sum("trip_distance").alias("total_distance")  # 총 운행 거리
)

end = time.time()

elapsed_time = end - start
print(f"경과 시간: {elapsed_time:.2f}초")


In [15]:
%pyspark

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

plt.rc('font', family='NanumGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

# Pandas로 변환
pandas_df = grouped_df.toPandas()

# pickup_time을 기준으로 정렬
pandas_df = pandas_df.sort_values(by="pickup_time")

In [16]:
%pyspark

# 꺾은선 그래프 그리기
plt.figure(figsize=(10, 6))

# 총 운행 거리
plt.plot(pandas_df["pickup_time"], pandas_df["total_distance"], marker="o", label="총 거리")

# 총 운임
plt.plot(pandas_df["pickup_time"], pandas_df["total_fare"], marker="o", label="총 요금량")

# 운행 횟수
plt.plot(pandas_df["pickup_time"], pandas_df["trip_count"], marker="o", label="이용 수")

# 평균 팁
plt.plot(pandas_df["pickup_time"], pandas_df["avg_tip"], marker="o", label="평균 팁")

# 그래프 타이틀 및 레이블
plt.title("일일 택시 사용량")
plt.xlabel("픽업 시간")
plt.ylabel("사용량")
plt.legend()

# 그래프 보여주기
plt.grid(True)
plt.tight_layout()
plt.show()

In [17]:
%pyspark

query = """
SELECT
    *
FROM 
    taxicomb tc
INNER JOIN
    weather w
ON
    tc.pickup_time = w.hour AND tc.pickup_date = w.date;
"""

comb_df = spark.sql(query)



특정 시간대의 상위 3개


In [19]:
%pyspark

# PySpark 환경에서 데이터프레임을 불러온다고 가정
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Spark session 생성
spark = SparkSession.builder.appName("stacked_bar_chart").getOrCreate()

query = """
WITH RankedPickups AS (
  SELECT
    pickup_time,
    pickup_zone,
    COUNT(*) AS pickup_count,
    ROW_NUMBER() OVER (PARTITION BY pickup_time ORDER BY COUNT(*) DESC) AS rank
  FROM taxicomb
  WHERE pickup_date = '2021-01-02'
  GROUP BY pickup_time, pickup_zone
)
SELECT
  pickup_time,
  pickup_zone,
  pickup_count
FROM RankedPickups
WHERE rank <= 1
ORDER BY pickup_time, pickup_count DESC;
"""
# 데이터프레임 불러오기 (여기서는 가정된 예시)
df = spark.sql(query)

# 시간대별, 픽업 존별로 데이터를 그룹화
grouped_df = df.groupBy("pickup_time", "pickup_zone").agg(F.sum("pickup_count").alias("total_count"))

# Pandas로 변환
pandas_df = grouped_df.toPandas()


In [20]:
%pyspark

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

plt.rc('font', family='NanumGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

# 피벗 테이블로 변환하여 시간대별 구역 데이터를 준비
pivot_df = pandas_df.pivot(index='pickup_time', columns='pickup_zone', values='total_count')

# NaN 값을 0으로 변경 (픽업이 없을 경우)
pivot_df = pivot_df.fillna(0)

# 차트 그리기
pivot_df.plot(kind='bar', stacked=True, figsize=(10, 6))

# 차트 타이틀 및 레이블
plt.title('일일 택시 이용객의 선호 출발지')
plt.xlabel('이용 시간')
plt.ylabel('이용객 수')
plt.legend(title='Pickup Zone')

# 차트 보여주기
plt.tight_layout()
plt.show()

In [21]:
%pyspark

# PySpark 환경에서 데이터프레임을 불러온다고 가정
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Spark session 생성
spark = SparkSession.builder.appName("stacked_bar_chart").getOrCreate()

query = """
WITH RankedPickups AS (
  SELECT
    dropoff_time,
    dropoff_zone,
    COUNT(*) AS dropoff_count,
    ROW_NUMBER() OVER (PARTITION BY dropoff_time ORDER BY COUNT(*) DESC) AS rank
  FROM taxicomb
  WHERE dropoff_date = '2021-01-02'
  GROUP BY dropoff_time, dropoff_zone
)
SELECT
  dropoff_time,
  dropoff_zone,
  dropoff_count
FROM RankedPickups
WHERE rank <= 1
ORDER BY dropoff_time, dropoff_count DESC;
"""
# 데이터프레임 불러오기 (여기서는 가정된 예시)
df = spark.sql(query)

# 시간대별, 픽업 존별로 데이터를 그룹화
grouped_df = df.groupBy("dropoff_time", "dropoff_zone").agg(F.sum("dropoff_count").alias("total_count"))

# Pandas로 변환
pandas_df = grouped_df.toPandas()

In [22]:
%pyspark

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib

plt.rc('font', family='NanumGothic')
matplotlib.rcParams['axes.unicode_minus'] = False

# 피벗 테이블로 변환하여 시간대별 구역 데이터를 준비
pivot_df = pandas_df.pivot(index='dropoff_time', columns='dropoff_zone', values='total_count')

# NaN 값을 0으로 변경 (픽업이 없을 경우)
pivot_df = pivot_df.fillna(0)

# 차트 그리기
pivot_df.plot(kind='bar', stacked=True, figsize=(10, 6))

# 차트 타이틀 및 레이블
plt.title('일일 택시 이용객의 선호 도착지')
plt.xlabel('도착 시간')
plt.ylabel('도착 횟수')
plt.legend(title='Dropoff Zone')

# 차트 보여주기
plt.tight_layout()
plt.show()