In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta import *
import pandas as pd
# start spark session

jars = "libs/hadoop-aws-3.3.4.jar,libs/postgresql-42.2.20.jar,libs/aws-java-sdk-bundle-1.12.260.jar"
builder = SparkSession.builder \
    .master("local[*]") \
    .appName('uber_analyst') \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "10") \
    .config("spark.shuffle.spill", "true") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
    .config("spark.jars", jars) \
    .config("spark.sql.debug.maxToStringFields", 1000)\
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
    .config("spark.hadoop.fs.s3a.endpoint", f"http://localhost:9000")\
    .config("spark.hadoop.fs.s3a.access.key", "admin")\
    .config("spark.hadoop.fs.s3a.secret.key", "123456789")\
    .config("spark.hadoop.fs.s3a.path.style.access", "true")\
    .config("spark.hadoop.fs.connection.ssl.enabled", "false")\
    .config(
                "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
            )\
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')\
    .config('spark.sql.warehouse.dir', f's3a://delta-uber/')

In [2]:
spark = configure_spark_with_delta_pip(builder, extra_packages=["org.apache.hadoop:hadoop-aws:3.3.4"]).getOrCreate()

In [3]:
spark

In [4]:
minio_bucket = "delta-uber"
list_table = ['fact_table', 'datetime_dim', 'pickup_location_dim', 'dropoff_location_dim', 'passenger_count_dim', 'trip_distance_dim', 'rate_code_dim', 'payment_type_dim']

for table_name in list_table:
    df = spark.read.format("delta").load(f"s3a://{minio_bucket}/uber/{table_name}")
    df.createOrReplaceTempView(table_name)

print("Đã tạo view tạm thời cho tất cả các bảng")

Đã tạo view tạm thời cho tất cả các bảng


In [5]:
df.show()

+---------------+------------+-----------------+
|payment_type_id|payment_type|payment_type_name|
+---------------+------------+-----------------+
+---------------+------------+-----------------+



In [6]:
# Tổng số chuyến đi
total_trips = spark.sql("SELECT COUNT(*) as total_trips FROM fact_table")
total_trips.show()

+-----------+
|total_trips|
+-----------+
|     150000|
+-----------+



In [7]:
# Tổng số chuyến đi
total_trips = spark.sql("SELECT * FROM fact_table")
total_trips.show()

+--------+-----------+------------------+-------------------+------------------+----------------+------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|vendorid|datetime_id|pickup_location_id|dropoff_location_id|passenger_count_id|trip_distance_id|rate_code_id|payment_type_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+-----------+------------------+-------------------+------------------+----------------+------------+---------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       2|60129542144|       60129542144|        60129542144|       60129542144|     60129542144| 60129542144|    60129542144|       35.5|  0.5|    0.5|     11.04|         0.0|                  0.3|       47.84|
|       1|60129542145|       60129542145|        60129542145|       60129542145|     60129542145| 60129542145|    60129542145|        7.0|  0.5|    0.5|

In [None]:
# Tổng số chuyến đi
total_trips = spark.sql("SELECT COUNT(*) as total_trips FROM fact_table")
total_trips.show()

# Tổng doanh thu
total_revenue = spark.sql("SELECT SUM(total_amount) as total_revenue FROM fact_table")
total_revenue.show()

# Khoảng cách trung bình của chuyến đi
avg_distance = spark.sql("SELECT AVG(trip_distance) as avg_distance FROM trip_distance_dim")
avg_distance.show()

# Phân bố phương thức thanh toán
payment_distribution = spark.sql("""
    SELECT p.payment_type_name, COUNT(*) as count
    FROM fact_table f
    JOIN payment_type_dim p ON f.payment_type_id = p.payment_type_id
    GROUP BY p.payment_type_name
    ORDER BY count DESC
""")
payment_distribution.show()

# Tổng doanh thu theo loại giá cước
revenue_by_rate_code = spark.sql("""
    SELECT r.rate_code_name, SUM(f.total_amount) as total_revenue
    FROM fact_table f
    JOIN rate_code_dim r ON f.rate_code_id = r.rate_code_id
    GROUP BY r.rate_code_name
    ORDER BY total_revenue DESC
""")
revenue_by_rate_code.show()

In [None]:
# Phân tích theo thời gian
hourly_analysis = spark.sql("""
    SELECT d.pickup_hour, AVG(f.total_amount) as avg_fare, COUNT(*) as trip_count
    FROM fact_table f
    JOIN datetime_dim d ON f.datetime_id = d.datetime_id
    GROUP BY d.pickup_hour
    ORDER BY d.pickup_hour
""")
hourly_analysis.show()

# Phân tích theo vị trí địa lý
location_analysis = spark.sql("""
    SELECT 
        p.pickup_latitude, p.pickup_longitude, 
        COUNT(*) as trip_count,
        AVG(f.total_amount) as avg_fare
    FROM fact_table f
    JOIN pickup_location_dim p ON f.pickup_location_id = p.pickup_location_id
    GROUP BY p.pickup_latitude, p.pickup_longitude
    ORDER BY trip_count DESC
    LIMIT 10
""")
location_analysis.show()