In [None]:
from pyspark import SparkContext, SparkConf
import csv
from io import StringIO

# Spark 설정
conf = SparkConf().setAppName("NYC_Taxi_Analysis").setMaster("local[*]").set("spark.driver.bindAddress", "127.0.0.1") \
    .set("spark.driver.host", "127.0.0.1") \
    .set("spark.blockManager.port", "0") \
    .set("spark.driver.memory", "4g") \
    .set("spark.executor.memory", "4g")

sc = SparkContext.getOrCreate(conf=conf)

# 데이터 로드 함수
def load_data(file_path, file_format):
    if file_format == "parquet":
        raw_rdd = sc.binaryFiles(file_path)
    return raw_rdd

rdd = load_data("/Users/admin/Desktop/GitHub/softeer/과제/W5/M1/NYC_TLC_Trip_Data", "parquet")
print(rdd.take(5))

In [5]:
conf.exit()

AttributeError: 'SparkConf' object has no attribute 'exit'

[Stage 0:>                                                          (0 + 1) / 1]

In [6]:
sc.stop()

In [7]:
from pyspark import SparkContext, SparkConf
from io import StringIO
from pyspark.sql import SparkSession
import shutil
import os

# conf = SparkConf().setAppName("NYC_TAXI_Analysis").setMaster("local[*]")
# sc = SparkContext(conf=conf)

# def load_data(file_path, file_format) :
#     if file_format == "parquet" :
#         raw_rdd = sc.textFile(file_path)

spark = SparkSession.builder.appName("NYC_Taxi_Analysis").config("spark.driver.bindAddress", "127.0.0.1").getOrCreate()

def load_data(file_path, file_format) :
    if file_format == "parquet" :
        raw_df = spark.read.parquet(file_path)
        raw_rdd = raw_df.rdd
    # 데이터프레임 타입 에러 확인
    return raw_rdd

def clean_data(rdd) :
    rdd = rdd.filter(lambda row: row.tpep_pickup_datetime is not None)
    rdd = rdd.filter(lambda row: row.total_amount is not None)
    rdd = rdd.filter(lambda row: row.trip_distance is not None)
    return rdd

def transform_data(rdd) :
    return rdd.map(lambda row: (row.tpep_pickup_datetime.strftime('%Y-%m-%d'), float(row.total_amount), float(row.trip_distance)))

def aggreagate_data(rdd) :
    # total_trips = rdd.count()
    # total_revenue = rdd.map(lambda x : x[1]).sum()
    # avg_trip_distance = rdd.map(lambda x : x[2]).mean()
    total_trips, total_revenue, total_trip_distance = rdd.aggregate(
        (0, 0.0, 0.0),
        lambda acc, x : (acc[0] + 1, acc[1] + x[1], acc[2] + acc[2]),
        lambda acc1, acc2 : (acc1[0] + acc2[0], acc1[1] + acc2[1] , acc1[2] + acc2[2])
        )
    
    avg_trip_distance = total_trip_distance / total_revenue

    daily_trips = rdd.map(lambda x : (x[0],1)).reduceByKey(lambda a, b : a + b).collect()
    daily_revenue = rdd.map(lambda x : (x[0],x[1])).reduceByKey(lambda a, b : a + b).collect()
    return total_trips, total_revenue, avg_trip_distance, daily_trips, daily_revenue

def print_results(results) :
    total_trips, total_revenue, avg_trip_distance, daily_trips, daily_revenue = results
    print("Total trips :", total_trips)
    print("Total Revenue :", total_revenue)
    print("Average Trip Distance :", avg_trip_distance)

def save_results(results, output_path) :
    if os.path.exists(output_path) :
        shutil.rmtree(output_path)

    total_trips, total_revenue, avg_trip_distance, daily_trips, daily_revenue = results
    total_df = spark.createDataFrame([
        ("Total Trips", total_trips),
        ("Total Revenue", total_revenue),
        ("Average Trip Distance", avg_trip_distance)
    ], ["Metric", "Value"])

    trips_df = spark.createDataFrame(daily_trips, ["Date", "Trips"])
    revenue_df = spark.createDataFrame(daily_revenue, ["Date", "Revenue"])

    daily_df = trips_df.join(revenue_df, on="Date", how="inner")

    total_df.coalesce(1).write.csv(f"{output_path}/total_results.csv", mode="overwrite", header=True)
    daily_df.coalesce(1).write.csv(f"{output_path}/daily_results.csv", mode="overwrite", header=True)


if __name__ == "__main__" :
    raw_rdd = load_data("./NYC_TLC_Trip_Data/yellow_tripdata_2022-01.parquet", "parquet")
    cleaned_rdd = clean_data(raw_rdd)
    transformed_rdd = transform_data(cleaned_rdd)
    results = aggreagate_data(transformed_rdd)
    print_results(results)
    save_results(results, "./output")

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/admin/Desktop/GitHub/softeer/과제/W5/NYC_TLC_Trip_Data/yellow_tripdata_2022-01.parquet.

In [8]:
spark.stop()