In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data") \
    .getOrCreate()

In [None]:
file1_path = r'C:\Users\yaw\Desktop\大三下\数据可视化\2023～2024第2学期数据可视化技术考试\数据集\2022_Yellow_Taxi_Trip_Data.csv'
file2_path = r'C:\Users\yaw\Desktop\大三下\数据可视化\2023～2024第2学期数据可视化技术考试\数据集\taxi_zones.csv'

In [None]:
# 读取 CSV 文件，默认情况下分隔符为逗号 ","
df1 = spark.read.csv(file1_path, header=True, inferSchema=True)
df1.show(truncate=False)

## 数据预处理
- 定义一个函数，把将字符串转换为 datetime 对象
- 去除异常值


In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import TimestampType
from datetime import datetime

# 定义一个函数，将字符串转换为 datetime 对象
def parse_datetime(date_str):
    return datetime.strptime(date_str, '%m/%d/%Y %I:%M:%S %p')

parse_datetime_udf = udf(parse_datetime, TimestampType())

df1 = df1.withColumn('tpep_pickup_datetime', parse_datetime_udf(col("tpep_pickup_datetime")))
df1 = df1.withColumn('tpep_dropoff_datetime', parse_datetime_udf(col("tpep_dropoff_datetime")))

# 显示转换后的 DataFrame
df1.show(truncate=False)

In [None]:
df1.describe().show()

In [None]:
df1.printSchema()

In [None]:
from pyspark.sql.functions import col, expr

fields = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']

# 初始化 mean_stddev 字典
mean_stddev = {}

# 从 describe_results 中提取 mean 和 stddev 值
for row in df1.describe(*fields).collect():
    if row.summary == 'mean':
        for field in fields:
            mean_stddev[field] = (row[field], None)  # 初始化为平均值和 None 标准差
    elif row.summary == 'stddev':
        for field in fields:
            if mean_stddev[field][1] is None:  # 如果标准差尚未设置
                mean_stddev[field] = (mean_stddev[field][0], row[field])


filter_condition_str = ""

for field in fields:
    # 从字典中获取字段的平均值和标准差
    mean, stddev = mean_stddev[field]

    min_value = float(mean) - 3 * float(stddev)
    max_value = float(mean) + 3 * float(stddev)

    # 将过滤条件添加到字符串
    if filter_condition_str: 
        filter_condition_str += " AND "
    filter_condition_str += f"{field} >= {min_value} AND {field} <= {max_value}"


filtered_df = df1.filter(expr(filter_condition_str))
filtered_df.show()

In [None]:
filtered_df.describe().show()

In [None]:
filtered_df = filtered_df.filter(col("passenger_count") > 0)
filtered_df.describe().show()

## 1 时间段 与 行程数 关系 heatmap calendar
- month
- dayOfWeek
- hour
- dayOfMonth

In [None]:
from pyspark.sql.functions import dayofweek, month, hour, dayofmonth, substring

df_with_datetime = filtered_df.withColumn('pickup_dayOfWeek', dayofweek(col('tpep_pickup_datetime')))
df_with_datetime = df_with_datetime.withColumn('pickup_month', month(col('tpep_pickup_datetime')))
df_with_datetime = df_with_datetime.withColumn('pickup_dayOfMonth', dayofmonth(col('tpep_pickup_datetime')))
df_with_datetime = df_with_datetime.withColumn('pickup_hour', hour(col('tpep_pickup_datetime')))
df_with_datetime = df_with_datetime.withColumn('pickup_date', substring(col('tpep_pickup_datetime'), 1, 10))

In [None]:
df_with_datetime.show()

In [None]:
df_with_date = df_with_datetime.selectExpr('pickup_date', "pickup_dayOfWeek",'pickup_month','pickup_dayOfMonth','pickup_hour')
df_with_date.createOrReplaceTempView("df_with_datetime")
df_with_date.show()

In [None]:
from pyecharts.charts import Bar
from pyecharts import options as opts

def bar_chart(colume, title = '柱状图'):
    data = spark.sql(
                f"""
                select {colume}, count(*) as count
                from df_with_datetime
                group by {colume}
                order by {colume}
                """
    )
    data_df = data.toPandas()

    # 使用 pyecharts 绘制柱状图
    bar = (
        Bar()
        .add_xaxis(data_df[colume].tolist())  # 直接使用 DataFrame 列
        .add_yaxis("Count", data_df['count'].tolist()
                   ,label_opts=opts.LabelOpts(is_show=False)  # 隐藏数据标签
                   )
        .set_global_opts(
            title_opts=opts.TitleOpts(title= title),  # 图表标题
            xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=45))  # X轴标签旋转
        )
    )

    bar.render(f'image/{title}.html')

In [None]:
bar_chart('pickup_dayOfMonth', 'pickup_dayOfMonth')
bar_chart('pickup_dayOfWeek', 'pickup_dayOfWeek')
bar_chart('pickup_hour', 'pickup_hour')
bar_chart('pickup_month', 'pickup_month')

## 2 行程距离与总费用的关系

In [None]:
filtered_df.createOrReplaceTempView("df")
filtered_df.show()

In [None]:
trip_distanceAndAmount = spark.sql(
    '''
    select trip_distance
        , round(avg(fare_amount),2) as fare_avg_amount
        , round(avg(total_amount),2) as total_avg_amount
        , round(avg(tolls_amount),2) as tolls_avg_amount
        , round(avg(improvement_surcharge),2) as improvement_avg_amount
    from df
    group by trip_distance
    order by trip_distance
    '''
)

# trip_distanceAndAmount.show()

In [None]:
trip_distanceAndAmount.show()

In [None]:
from pyecharts.charts import Scatter
from pyecharts import options as opts

# 将查询结果转换为 pandas DataFrame
data_df = trip_distanceAndAmount.toPandas()

# 从 DataFrame 中获取数据
trip_distance = data_df['trip_distance']
fare_avg_amount = data_df['fare_avg_amount']
total_avg_amount = data_df['total_avg_amount']
tolls_avg_amount = data_df['tolls_avg_amount']
improvement_avg_amount = data_df['improvement_avg_amount']

# 创建一个 Scatter 对象
scatter = (
    Scatter()
    .add_xaxis(trip_distance.tolist())  # 设置 x 轴数据
    .add_yaxis("Fare Average Amount", fare_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Total Average Amount", total_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Tolls Average Amount", tolls_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Improvement Average Amount", improvement_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(
        title_opts=opts.TitleOpts(
            title="Trip Distance vs Average Amounts",
            pos_left="center",
            pos_top="top",
            padding=[20, 0, 0, 0]
        ),
        legend_opts=opts.LegendOpts(orient="horizontal", pos_top="bottom", pos_left="center"),
        xaxis_opts=opts.AxisOpts(
            axislabel_opts=opts.LabelOpts(rotate=-15),
            max_=1213,
            min_=0,
            type_='value'
        ),
        yaxis_opts=opts.AxisOpts(name="Average Amount")
    )
)

# 渲染图表到 HTML 文件中
scatter.render('image/scatter_chart.html')

In [None]:
from pyecharts.charts import Scatter
from pyecharts import options as opts

# 将查询结果转换为 pandas DataFrame
data_df = trip_distanceAndAmount.toPandas()

# 从 DataFrame 中获取数据
trip_distance = data_df['trip_distance']
fare_avg_amount = data_df['fare_avg_amount']
total_avg_amount = data_df['total_avg_amount']
tolls_avg_amount = data_df['tolls_avg_amount']
improvement_avg_amount = data_df['improvement_avg_amount']

# 创建一个 Scatter 对象
scatter = (
    Scatter()
    .add_xaxis(trip_distance.tolist())  # 设置 x 轴数据
    .add_yaxis("Fare Average Amount", fare_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Total Average Amount", total_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Tolls Average Amount", tolls_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .add_yaxis("Improvement Average Amount", improvement_avg_amount.tolist(), symbol_size=3, label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(
        title_opts=opts.TitleOpts(
            title="Trip Distance vs Average Amounts",
            pos_left="center",
            pos_top="top",
            padding=[20, 0, 0, 0]
        ),
        legend_opts=opts.LegendOpts(orient="horizontal", pos_top="bottom", pos_left="center"),
        xaxis_opts=opts.AxisOpts(
            axislabel_opts=opts.LabelOpts(rotate=-15),
            max_=200,
            min_=0,
            type_='value'
        ),
        yaxis_opts=opts.AxisOpts(name="Average Amount")
    )
)

# 渲染图表到 HTML 文件中
scatter.render('image/scatter_chart1.html')

In [None]:
# from pyecharts.charts import Line
# from pyecharts import options as opts

# # 将查询结果转换为 pandas DataFrame
# data_df = trip_distanceAndAmount.toPandas()

# trip_distance = data_df['trip_distance']
# fare_avg_amount = data_df['fare_avg_amount']
# total_avg_amount = data_df['total_avg_amount']
# tolls_avg_amount = data_df['tolls_avg_amount']
# improvement_avg_amount = data_df['improvement_avg_amount']

# # 创建一个 Line 对象
# line = (
#     Line()
#     .add_xaxis(trip_distance.tolist())  # 设置 x 轴数据
#     .add_yaxis("Fare Average Amount", fare_avg_amount.tolist(), 
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))  # 添加折线图数据系列
#     .add_yaxis("Total Average Amount", total_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .add_yaxis("Tolls Average Amount", tolls_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .add_yaxis("Improvement Average Amount", improvement_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .set_global_opts(
#         title_opts=opts.TitleOpts(
#             title="Trip Distance vs Average Amount",
#             pos_left="center",  # 标题水平居中
#             pos_top="top",     # 标题在顶部
#             padding=[20, 0, 0, 0]  # 调整标题与图表内容的间距
#         ),
#         legend_opts=opts.LegendOpts(orient="horizontal", pos_top="bottom", pos_left="center"),  # 调整图例位置
#         xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-15), max_=1213, min_=0, type_='value'),
#         yaxis_opts=opts.AxisOpts(name="Average Amount")
#     )
# )

# # 渲染图表到 HTML 文件中
# line.render('image/line_chart.html')  # 注意在 Windows 系统中使用双反斜杠作为路径分隔符

In [None]:
# from pyecharts.charts import Line
# from pyecharts import options as opts

# # 将查询结果转换为 pandas DataFrame
# data_df = trip_distanceAndAmount.toPandas()

# trip_distance = data_df['trip_distance']
# fare_avg_amount = data_df['fare_avg_amount']

# # 创建一个 Line 对象
# line = (
#     Line()
#     .add_xaxis(trip_distance.tolist())  # 设置 x 轴数据
#     .add_yaxis("Fare Average Amount", fare_avg_amount.tolist(), 
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))  # 添加折线图数据系列
#     .add_yaxis("Total Average Amount", total_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .add_yaxis("Tolls Average Amount", tolls_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .add_yaxis("Improvement Average Amount", improvement_avg_amount.tolist(),
#         symbol="emptyCircle",
#         is_symbol_show=True,
#         is_smooth=True,
#         label_opts=opts.LabelOpts(is_show=False))
#     .set_global_opts(
#         title_opts=opts.TitleOpts(
#             title="Trip Distance vs Average Amount",
#             pos_left="center",  # 标题水平居中
#             pos_top="top",     # 标题在顶部
#             padding=[20, 0, 0, 0]  # 调整标题与图表内容的间距
#         ),
#         legend_opts=opts.LegendOpts(orient="horizontal", pos_top="bottom", pos_left="center"),  # 调整图例位置
#         xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=-15), max_=120, min_=0, type_='value'),
#         yaxis_opts=opts.AxisOpts(name="Average Amount")
#     )
# )

# # 渲染图表到 HTML 文件中
# line.render('image/line_chart1.html')  # 注意在 Windows 系统中使用双反斜杠作为路径分隔符

## 3 支付方式与小费的关系

In [None]:
payment_typeAndTip = spark.sql(
    '''
    select payment_type 
         , round(avg(tip_amount), 4) tip_avg_amount
         , round(sum(tip_amount),4) tip_sum_amount
         , count(1) tip_count
    from df
    group by payment_type
    order by payment_type
    '''
)

payment_typeAndTip.show()

In [None]:
from pyecharts.charts import Pie

# 选择数据列
data_df = payment_typeAndTip.toPandas()
categories = data_df['payment_type']
data = data_df['tip_avg_amount']

# 创建一个 Pie 对象
pie = (
    Pie()
    .add(
        series_name="Tip Count",  # 系列名称
        data_pair=[list(z) for z in zip(categories, data)],  # 数据项
        rosetype="radius",  # 设置为半径模式的玫瑰图
        label_opts=opts.LabelOpts(formatter="{b}: {d}%")  # 设置标签格式
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="Payment Type Tip Avg Amount"),  # 图表标题
        legend_opts=opts.LegendOpts(orient="vertical", pos_top="15%", pos_left="2%")  # 图例设置
    )
)

# 渲染图表到 HTML 文件中
pie.render('image/pie_chart_avg.html')

In [None]:
from pyecharts.charts import Pie
from pyecharts import options as opts

categories = data_df['payment_type']
data = data_df['tip_count']

# 创建一个 Pie 对象，并设置内外半径来创建环图
pie = (
    Pie(init_opts=opts.InitOpts(width="800px", height="600px"))
    .add(
        series_name="Tip Count",  # 系列名称
        data_pair=[list(z) for z in zip(categories, data)],  # 数据项
        radius=["40%", "60%"],  # 设置内外半径，创建环图效果
        label_opts=opts.LabelOpts(formatter="{b}: {d}%"),  # 设置标签格式
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="Payment Type Tip Count"),  # 图表标题
        legend_opts=opts.LegendOpts(orient="vertical", pos_top="15%", pos_left="2%")  # 图例设置
    )
)

# 渲染图表到 HTML 文件中
pie.render('image/pie_chart_count.html')

In [None]:
from pyecharts.charts import Pie
from pyecharts import options as opts

categories = data_df['payment_type']
data = data_df['tip_sum_amount']

# 创建一个 Pie 对象
pie = (
    Pie()
    .add(
        series_name="Payment Type",
        data_pair=[list(z) for z in zip(categories, data)],
        radius="75%",
        label_opts=opts.LabelOpts(
            position="outside",
            formatter="{b}: {d}%"
        ),
        tooltip_opts=opts.TooltipOpts(
            trigger="item",
            formatter="{a} <br/>{b} : {c} ({d}%)"
        )
    )
    .set_global_opts(
        title_opts=opts.TitleOpts(title="Payment Type Tip Sum Amount"),
        legend_opts=opts.LegendOpts(orient="vertical", pos_top="15%", pos_left="2%"),
    )
)

# 渲染图表到 HTML 文件中
pie.render('image/pie_chart_sum.html')

## 4 起始和结束地点的分析

In [None]:
df2 = spark.read.csv(file2_path, header=True, inferSchema=True)
df2.createOrReplaceTempView("df2")
df2.show()

In [None]:
area = spark.sql(
    '''
    with b as (
        select LocationID, borough
        from df2
    )
    select PULocationID, b1.borough PUborough, DOLocationID, b2.borough DOborough, value
    from (
        select PULocationID, DOLocationID, count(1) as value
        from df
        group by PULocationID, DOLocationID
    ) t inner join b b1 on t.PULocationID = b1.LocationID
    inner join b b2 on t.DOLocationID = b2.LocationID
    '''
)

area.createOrReplaceTempView('area')
area.show()

In [None]:
categories = spark.sql(
    '''
    with borough as (
            select borough
            from df2
            group by borough
    ),
    t as (
        select concat('{
                            "name":"', borough, '"
                            }'
                    )  j
        from borough
    ) 
    select concat('[',concat_ws(',',collect_set(j)), ']') categories
    from t
    '''
)

In [None]:
links = spark.sql(
    """
    WITH t AS (
        SELECT concat('{
                            "source": "', PULocationID, '", 
                            "target": "', DOLocationID, '"
                        }'
                    ) AS j
        FROM area
    )
    SELECT concat('[', concat_ws(',', collect_list(j)), ']') AS links
    FROM t
    """
)

In [None]:
nodes = spark.sql(
    """
    with area1 AS (
        SELECT LocationID, borough, total_quantity,
            ceil(100 * total_quantity / sum(total_quantity) over(rows between unbounded preceding and unbounded following)) symbolSize
        FROM (
            SELECT LocationID, borough, SUM(quantity) as total_quantity  
            FROM (
                SELECT PULocationID LocationID, PUborough borough, SUM(value) AS quantity FROM area GROUP BY PULocationID, PUborough
                union all
                SELECT DOLocationID LocationID, DOborough borough, SUM(value) AS quantity FROM area GROUP BY DOLocationID, DOborough
            ) t0 
            group by LocationID, borough
        ) t1
    )
    , t AS (
    SELECT concat(
        '{
                "name": "', LocationID,'",
                "symbolSize":', symbolSize,',
                "draggable": "False",
                "value":', total_quantity ,',
                "category": "', borough ,'",
                "label": {
                    "normal": {
                        "show": "True"
                    }
                }
            }'
        ) AS j
        FROM area1
    )
    SELECT concat('[', concat_ws(',', collect_list(j)), ']') AS nodes
    FROM t
    """
)

In [None]:
nodes.createOrReplaceTempView("nodes")
links.createOrReplaceTempView("links")
categories.createOrReplaceTempView("categories")

json = spark.sql(
    '''
    select concat('[', concat_ws(',', collect_list(j)), ']') as json
    from (
        select nodes j from nodes
        union all 
        select links j from links
        union all 
        select categories j from categories
    ) t
    '''
)

In [None]:
# 假定nodes DataFrame只有一个包含JSON数组的'nodes'列
json_content = json.collect()[0][0] 

# 将提取的JSON内容写入文件
with open("graph1.json", "w") as f:
    f.write(json_content)

In [None]:
import json
from pyecharts import options as opts
from pyecharts.charts import Graph

# 假设你的 graph.json 文件包含 nodes, links 和 categories 数据
with open("graph1.json", "r", encoding="utf-8") as f:
    j = json.load(f)
    nodes = j[0] 
    links = j[1]  
    categories = j[2] if len(j) > 2 else None


c = Graph()
c.add(
    "", 
    nodes=nodes,
    links=links,
    categories=categories,
    repulsion=50,
    label_opts=opts.LabelOpts(is_show=False),  # 假设我们不显示标签
    linestyle_opts=opts.LineStyleOpts(curve=0.2),  # 设置边的曲线样式
    # layout="circular",
    # is_rotate_label=True,
    # linestyle_opts=opts.LineStyleOpts(color="source", curve=0.3),
    # label_opts=opts.LabelOpts(position="right"),
)
c.set_global_opts(
    legend_opts=opts.LegendOpts(is_show=True),  # 隐藏图例
)

# 渲染图表到HTML文件
c.render("image/graph.html")

## 5 6 地理分析和可视化：分析不同区域的行程数量分布。

In [None]:
PUL = spark.sql(
    '''
    select LocationID, borough, zone, the_geom, value
    from (
        select PULocationID, count(*) as value
        from df
        group by PULocationID
    ) t
    inner join df2 on df2.LocationID = t.PULocationID
    '''
)

PUL.show()

In [None]:
import geopandas as gpd
from shapely import wkt
import matplotlib.pyplot as plt


PUL_dict = PUL.collect()

# 创建GeoDataFrame
# 使用shapely的wkt.loads来解析WKT字符串
PUL_gdf = gpd.GeoDataFrame(
    [(row['LocationID'], row['borough'], row['zone'], wkt.loads(row['the_geom']), row['value'])
     for row in PUL_dict],
    columns=['LocationID', 'borough', 'zone', 'geometry', 'value']
)

# 将几何数据设置为GeoDataFrame的几何列
PUL_gdf.set_geometry('geometry', inplace=True)

PUL_gdf.crs = "EPSG:4326"
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
PUL_gdf.plot(ax=ax, column='value', legend=True, cmap='OrRd')  # 使用OrRd颜色映射
plt.title('LocationID Heatmap')
plt.show()

In [None]:
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import GeoJSONDataSource, HoverTool, ColorBar, LinearColorMapper
from bokeh.transform import linear_cmap
from bokeh.palettes import Sunset11  # 选择一个颜色映射
from bokeh.tile_providers import CARTODBPOSITRON_RETINA


output_notebook()

p = figure(title="LocationID Heatmap", width=800, height=400)
p.add_tile(CARTODBPOSITRON_RETINA)
PUL_gdf_json = PUL_gdf.to_json()


geosource = GeoJSONDataSource(geojson=PUL_gdf_json)
mapper = LinearColorMapper(palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value']))

p.patches('xs', 'ys', source=geosource,
          fill_color=linear_cmap(field_name='value', palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value'])),
          line_color=None, fill_alpha=0.7)


color_bar = ColorBar(color_mapper=mapper, width=8, location=(0, 0))
p.add_layout(color_bar, 'right')

hover = HoverTool(tooltips=[
    ("LocationID", "@LocationID"),
    ("Borough", "@borough"),
    ("Zone", "@zone"),
    ("Value", "@value"),
])


p.add_tools(hover)
show(p)

In [None]:
DOL = spark.sql(
    '''
    select LocationID, borough, zone, the_geom, value
    from (
        select DOLocationID, count(*) as value
        from df
        group by DOLocationID
    ) t
    inner join df2 on df2.LocationID = t.DOLocationID
    '''
)

DOL.show()

In [None]:
import geopandas as gpd
from shapely import wkt
import matplotlib.pyplot as plt

# 假设PUL是Spark DataFrame，你已经执行了Spark SQL查询
# PUL = spark.sql(...)

# 将Spark DataFrame转换为Python字典
PUL_dict = DOL.collect()

# 创建GeoDataFrame
# 使用shapely的wkt.loads来解析WKT字符串
PUL_gdf = gpd.GeoDataFrame(
    [(row['LocationID'], row['borough'], row['zone'], wkt.loads(row['the_geom']), row['value'])
     for row in PUL_dict],
    columns=['LocationID', 'borough', 'zone', 'geometry', 'value']
)

# 将几何数据设置为GeoDataFrame的几何列
PUL_gdf.set_geometry('geometry', inplace=True)

# 转换坐标系为EPSG:4326
PUL_gdf.crs = "EPSG:4326"

# 使用matplotlib绘制地图
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
PUL_gdf.plot(ax=ax, column='value', legend=True, cmap='OrRd')  # 使用OrRd颜色映射
plt.title('LocationID Heatmap')
plt.show()

In [None]:
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import GeoJSONDataSource, HoverTool, ColorBar, LinearColorMapper
from bokeh.transform import linear_cmap
from bokeh.palettes import Sunset11  # 选择一个颜色映射
from bokeh.tile_providers import CARTODBPOSITRON_RETINA

# 确保你的notebook环境支持交互式图表
output_notebook()

# 创建一个Bokeh figure
p = figure(title="LocationID Heatmap", width=800, height=400)

# 添加底图
p.add_tile(CARTODBPOSITRON_RETINA)

# 假设PUL_gdf是已经创建好的GeoDataFrame，并且已经设置好CRS
PUL_gdf_json = PUL_gdf.to_json()

# 创建GeoJSON数据源
geosource = GeoJSONDataSource(geojson=PUL_gdf_json)

# 使用线性颜色映射器，将数值映射到颜色
mapper = LinearColorMapper(palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value']))

# 添加图层，使用颜色映射
p.patches('xs', 'ys', source=geosource,
          fill_color=linear_cmap(field_name='value', palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value'])),
          line_color=None, fill_alpha=0.7)

# 创建颜色条
color_bar = ColorBar(color_mapper=mapper, width=8, location=(0, 0))
p.add_layout(color_bar, 'right')

# 创建Hover工具
hover = HoverTool(tooltips=[
    ("LocationID", "@LocationID"),
    ("Borough", "@borough"),
    ("Zone", "@zone"),
    ("Value", "@value"),
])

# 将Hover工具添加到figure
p.add_tools(hover)

# 显示图表
show(p)

In [None]:
PUL = spark.sql(
    '''
    select LocationID, borough, zone, the_geom, sum(value) value
    from (
        select LocationID, borough, zone, the_geom, -1 * value as value
        from (
            select PULocationID, count(*) as value
            from df
            group by PULocationID
        ) t
        inner join df2 on df2.LocationID = t.PULocationID
        union all
        select LocationID, borough, zone, the_geom, 1 * value as value
        from (
            select DOLocationID, count(*) as value
            from df
            group by DOLocationID
        ) t1
        inner join df2 on df2.LocationID = t1.DOLocationID
    ) t2
    group by LocationID, borough, zone, the_geom

    '''
)

PUL.show()

In [None]:
import geopandas as gpd
from shapely import wkt
import matplotlib.pyplot as plt

# 假设PUL是Spark DataFrame，你已经执行了Spark SQL查询
# PUL = spark.sql(...)

# 将Spark DataFrame转换为Python字典
PUL_dict = DOL.collect()

# 创建GeoDataFrame
# 使用shapely的wkt.loads来解析WKT字符串
PUL_gdf = gpd.GeoDataFrame(
    [(row['LocationID'], row['borough'], row['zone'], wkt.loads(row['the_geom']), row['value'])
     for row in PUL_dict],
    columns=['LocationID', 'borough', 'zone', 'geometry', 'value']
)

# 将几何数据设置为GeoDataFrame的几何列
PUL_gdf.set_geometry('geometry', inplace=True)

# 转换坐标系为EPSG:4326
PUL_gdf.crs = "EPSG:4326"

# 使用matplotlib绘制地图
fig, ax = plt.subplots(1, 1, figsize=(10, 10))
PUL_gdf.plot(ax=ax, column='value', legend=True, cmap='OrRd')  # 使用OrRd颜色映射
plt.title('LocationID Heatmap')
plt.show()

In [None]:
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import GeoJSONDataSource, HoverTool, ColorBar, LinearColorMapper
from bokeh.transform import linear_cmap
from bokeh.palettes import Sunset11  # 选择一个颜色映射
from bokeh.tile_providers import CARTODBPOSITRON_RETINA

# 确保你的notebook环境支持交互式图表
output_notebook()

# 创建一个Bokeh figure
p = figure(title="LocationID Heatmap", width=800, height=400)

# 添加底图
p.add_tile(CARTODBPOSITRON_RETINA)

# 假设PUL_gdf是已经创建好的GeoDataFrame，并且已经设置好CRS
PUL_gdf_json = PUL_gdf.to_json()

# 创建GeoJSON数据源
geosource = GeoJSONDataSource(geojson=PUL_gdf_json)

# 使用线性颜色映射器，将数值映射到颜色
mapper = LinearColorMapper(palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value']))

# 添加图层，使用颜色映射
p.patches('xs', 'ys', source=geosource,
          fill_color=linear_cmap(field_name='value', palette=Sunset11, low=min(PUL_gdf['value']), high=max(PUL_gdf['value'])),
          line_color=None, fill_alpha=0.7)

# 创建颜色条
color_bar = ColorBar(color_mapper=mapper, width=8, location=(0, 0))
p.add_layout(color_bar, 'right')

# 创建Hover工具
hover = HoverTool(tooltips=[
    ("LocationID", "@LocationID"),
    ("Borough", "@borough"),
    ("Zone", "@zone"),
    ("Value", "@value"),
])

# 将Hover工具添加到figure
p.add_tools(hover)

# 显示图表
show(p)

## 6 车内乘客与车费的关系 水平柱状图

In [None]:
passengerAndAmount = spark.sql(
    '''
    SELECT passenger_count 
            ,round(avg(fare_amount),2) as fatre_avg_amount
            , round(avg(tip_amount), 2) as tip_avg_amount
            , round(avg(total_amount), 2) as total_avg_amount
    FROM df
    group by passenger_count
    order by passenger_count
    '''
)

passengerAndAmount.show()

In [None]:
from pyecharts.charts import Bar
from pyecharts import options as opts

data_df = passengerAndAmount.toPandas()

bar = (
    Bar()  
    .add_xaxis(data_df['passenger_count'].values.tolist())  
    .add_yaxis("Total Avg Amount", data_df['total_avg_amount'].values.tolist(),  
                label_opts=opts.LabelOpts(is_show=False))  
    .add_yaxis("Fatre Avg Amount", data_df['fatre_avg_amount'].values.tolist(),  
                label_opts=opts.LabelOpts(is_show=False)) 
    .add_yaxis("Tip Avg Amount", data_df['tip_avg_amount'].values.tolist(),  
                label_opts=opts.LabelOpts(is_show=False)) 
    .set_global_opts(
        title_opts=opts.TitleOpts(title="PassengerCnt vs Amount"),  
        yaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=45)),  
        xaxis_opts=opts.AxisOpts(name="Amount")  
    )
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))  
    .reversal_axis()  
)

# 使用字符串格式化来生成文件名
bar.render(f'image/passenger_bar_chart.html')

## 7 供应商与车费关系 rander

In [None]:
RatecodeIDAndAmount = spark.sql(
    '''
    SELECT RatecodeID
        , COUNT(*) AS count
        , ROUND(AVG(fare_amount), 2) AS fare_avg_amount
        , ROUND(AVG(tip_amount), 2) AS tip_avg_amount
        , ROUND(AVG(tolls_amount), 2) AS tolls_avg_amount
        , ROUND(AVG(improvement_surcharge), 2) AS improvement_avg_amount
        , ROUND(AVG(total_amount), 2) AS total_avg_amount
        , ROUND(AVG(extra), 2) AS extra_avg_amount
    FROM df
    GROUP BY RatecodeID
    ORDER BY RatecodeID
    '''
)

RatecodeIDAndAmount.show()

In [None]:
from pyspark.sql import SparkSession
import pyecharts.options as opts
from pyecharts.charts import Radar
from pyecharts.globals import ThemeType


pandas_df = RatecodeIDAndAmount.toPandas()

data = [list(pandas_df.loc[i, ['fare_avg_amount', 'tip_avg_amount', 'tolls_avg_amount', 'improvement_avg_amount', 'total_avg_amount', 'extra_avg_amount']]) for i in pandas_df.index]
indicator = ["Fare", "Tip", "Tolls", "Improvement Surcharge", "Total", "Extra"]

# 创建雷达图
(
    Radar(init_opts=opts.InitOpts(theme=ThemeType.MACARONS))
    .add_schema(
        schema=[opts.RadarIndicatorItem(name=ind) for ind in indicator],
        shape="circle"
    )
    .add(
        series_name=indicator,
        data=data
    )
    .set_series_opts(label_opts=opts.LabelOpts(is_show=False))
    .set_global_opts(
        title_opts=opts.TitleOpts(title="RatecodeID雷达图"),
    )
    .render("image/ratecodeid_radar_chart.html")
)

## 8 congestion_surcharge 与 与高峰时段相关 车费

In [None]:
timeAndcongestion_avg_surcharge = spark.sql(
    '''
    select time, round(avg(congestion_surcharge), 2) as congestion_avg_surcharge
    from (
        select hour(tpep_pickup_datetime) * 60 + minute(tpep_pickup_datetime) time, congestion_surcharge
        from df
    ) t
    group by time
    order by time
    '''
)

timeAndcongestion_avg_surcharge.show()

In [None]:
from pyspark.sql import SparkSession
from pyecharts.charts import Line
from pyecharts import options as opts


data_dict = timeAndcongestion_avg_surcharge.collect()  

times = [row['time'] for row in data_dict]
surcharges = [row['congestion_avg_surcharge'] for row in data_dict]

line = (
    Line()
    .add_xaxis(times)
    .add_yaxis("拥堵费", surcharges)
    .set_global_opts(title_opts=opts.TitleOpts(title="拥堵费时间序列"))
)

# 渲染图表到HTML文件
line.render("image/congestion_surcharge_line_chart.html")