In [1]:
#由于文件数量较小，且为了代码的美观性，采用每一次运行需要重新修改路径的方式
path="data\d1_m8"
path2="data_solve\d1_m8"
path3="data_max_t\d1_m8"
file=path+'.csv'

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.conf
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import *
import pyspark.sql.functions as F
spark = SparkSession.builder.master().getOrCreate()
sc = spark.sparkContext
sqlContext = pyspark.SQLContext(sc)
df1 = spark.read.options(header='True', inferSchema='True', delimiter=',').csv(file)
#df1.show(2)

In [3]:
#数据子集选择
df2 = df1.drop(' Quality Index').drop(' NTIS Model Version')
#df2.show(2)

In [4]:
#列名重命名
df3=df2.withColumnRenamed(" Local Time", "Local Time").withColumnRenamed(" Day Type ID","Day Type ID").\
    withColumnRenamed(" Total Carriageway Flow", "Total CF").\
    withColumnRenamed(" Total Flow vehicles less than 5.2m","Total Fv<5.2m").\
    withColumnRenamed(" Total Flow vehicles 5.21m - 6.6m", "Total Fv[5.21m - 6.6m]").\
    withColumnRenamed(" Total Flow vehicles 6.61m - 11.6m","Total Fv[6.61m - 11.6m]").\
    withColumnRenamed(" Total Flow vehicles above 11.6m", "Total Fv> 11.6m").\
    withColumnRenamed(" Speed Value","Speed Value")
#df3.show(2)

In [5]:
df3.toPandas().isnull().sum()

Local Date                  0
Local Time                  0
Day Type ID                 0
Total CF                    7
Total Fv<5.2m               7
Total Fv[5.21m - 6.6m]      7
Total Fv[6.61m - 11.6m]     7
Total Fv> 11.6m             7
Speed Value                14
dtype: int64

In [12]:
from pyspark.sql.window import Window
forward = Window.partitionBy('Local Date').orderBy('Local Time').rowsBetween(
    Window.unboundedPreceding, Window.currentRow)

In [13]:
df3=df3.withColumn('Speed_forward', last('Speed Value', ignorenulls=True).over(forward))#将空值进行填充

In [15]:
df_clear=df3.dropna(subset='Total CF')#删除存在空值的行

In [16]:
df_clear.toPandas().isnull().sum()

Local Date                 0
Local Time                 0
Day Type ID                0
Total CF                   0
Total Fv<5.2m              0
Total Fv[5.21m - 6.6m]     0
Total Fv[6.61m - 11.6m]    0
Total Fv> 11.6m            0
Speed Value                7
Speed_forward              0
dtype: int64

In [17]:
# 每一天的平均值车速，并将值取出
ave_speed = df_clear.groupBy('Local Date').agg({'Speed_forward': 'mean'})
#ave_speed.orderBy("Local Date").show()
ave_speed1=ave_speed.withColumn('Local Date', regexp_replace('Local Date','/','-'))
ave_speed1 = ave_speed1.withColumn('date', to_date(ave_speed1['Local Date']))
ave_speed1 = ave_speed1.withColumn("week",dayofweek('date'))
day_ave_spe = ave_speed1.orderBy("date").select(F.collect_list('avg(Speed_forward)')).first()[0]
#ave_speed1.select("date","week","avg(Speed_forward)").orderBy('date').toPandas().to_csv(path2+"Speed.csv") #导出csv文件
#ave_speed1.select("date","week","avg(Speed_forward)").orderBy('date').show(3)

In [18]:
#每天的出行量，并将值取出
ave_cf = df_clear.groupBy('Local Date').agg({'Total CF': 'sum'})
ave_cf1=ave_cf.withColumn('Local Date', regexp_replace('Local Date','/','-'))
ave_cf1 = ave_cf1.withColumn('date', to_date(ave_cf1['Local Date']))
ave_cf1 = ave_cf1.withColumn("week",dayofweek('date'))
day_ave_cf = ave_cf1.orderBy('date').select(F.collect_list('sum(Total CF)')).first()[0]
#ave_cf1.orderBy('date').show()
#ave_cf1.select("date","week","sum(Total CF)").orderBy('date').toPandas().to_csv(path2+"TotalCF.csv") #导出csv文件
#ave_cf1.select("date","week","sum(Total CF)").orderBy('date').show(3)

In [32]:
def max_hour_PHD(df_day,need_list):
    #df_day.show()
    df_day1 = (df_day.withColumn('Local Time', F.col('Local Time').cast(TimestampType())))
    df_day1=df_day1.withColumn("hour", hour(to_timestamp("Local Time","HH:mm:ss")))\
                .withColumn("minute",minute(to_timestamp("Local Time","HH:mm:ss")))
    df_hour_speed=df_day1.groupBy('hour').agg({'Speed Value': 'mean'}).orderBy("hour")
    #df_hour_speed.show()
    day_speed_data = df_hour_speed.select(F.collect_list('avg(Speed Value)')).first()[0]
    #day_speed_data#按时间排序的小时平均车速
    #获取高峰小时
    df_hour_cf=df_day1.groupBy('hour').agg({'Total CF': 'sum'})
    #df_hour_cf.orderBy("hour").show()
    df_hour_cf=df_hour_cf.withColumnRenamed("sum(Total CF)", "sum_Total_CF")
    df_sql_cf=df_hour_cf.createOrReplaceTempView("carflow")
    """ spark.sql("select * from carflow\
                where sum_Total_CF=\
                (select max(sum_Total_CF) from carflow)").show() """
    m=spark.sql("select hour from carflow where sum_Total_CF=(select max(sum_Total_CF) from carflow)")
    max_t=m.select(F.collect_list('hour')).first()[0][0]
    print("高峰小时为：",max_t)
    need_list.append(max_t)
    #求高峰小时系数
    df_high_cf=df_day1.filter(df_day1['hour'] == max_t)
    max_cf15=df_high_cf.agg({'Total CF': 'max'}).select(F.collect_list('max(Total CF)')).first()[0][0]
    max_hour=df_hour_cf.agg({'sum_Total_CF': 'max'}).select(F.collect_list('max(sum_Total_CF)')).first()[0][0]
    PHF15=max_hour/(4*max_cf15)
    print("15分钟高峰小时系数为：{:.2f}".format(PHF15))
    need_list.append(PHF15)
    """拥堵指数的计算
    df_freeflow=df_day1.groupBy('hour').agg({'Total CF': 'sum'}).orderBy("hour")
    df_freeflow=df_freeflow.withColumnRenamed("sum(Total CF)", "sum_Total_CF")
    #df_freeflow.show()
    df_sql_freeflow=df_freeflow.createOrReplaceTempView("carflow1")
    freeflow_hour=spark.sql("select hour from carflow1 where sum_Total_CF=(select min(sum_Total_CF) from carflow1)").select(F.collect_list('hour')).first()[0][0]
    print("自由流的时间：",freeflow_hour)
    df_free_min=df_day1.filter(df_day1['hour'] == freeflow_hour)
    min_speed_value=df_free_min.agg({'Speed Value': 'mean'}).select(F.collect_list('avg(Speed Value)')).first()[0][0]
    print("自由流的速度为：",int(min_speed_value)) """

In [33]:
import csv

def create_csv():
        path_csv = path3+"max.csv"
        with open(path_csv,'w',newline='') as f:
                csv_write = csv.writer(f)
                csv_head = ["day","max_t","PHD15"]
                csv_write.writerow(csv_head)
def write_csv(data_row):
        path_csv = path3+"max.csv"
        with open(path_csv,'a',newline='') as f:
                csv_write = csv.writer(f)
                csv_write.writerow(data_row)


In [34]:
create_csv()
for i in range (1,32) :
    day="2019/8/"+str(i)
    need_list=list()
    need_list.append(day)
    df_day = df_clear.filter(df_clear['Local Date'] == day)#条件过滤
    print(day)
    max_hour_PHD(df_day,need_list)
    write_csv(need_list)

2019/8/1
高峰小时为： 7
15分钟高峰小时系数为：0.96
2019/8/2
高峰小时为： 7
15分钟高峰小时系数为：0.96
2019/8/3
高峰小时为： 9
15分钟高峰小时系数为：0.90
2019/8/4
高峰小时为： 10
15分钟高峰小时系数为：0.97
2019/8/5
高峰小时为： 7
15分钟高峰小时系数为：0.90
2019/8/6
高峰小时为： 7
15分钟高峰小时系数为：0.93
2019/8/7
高峰小时为： 7
15分钟高峰小时系数为：0.94
2019/8/8
高峰小时为： 7
15分钟高峰小时系数为：0.92
2019/8/9
高峰小时为： 7
15分钟高峰小时系数为：0.96
2019/8/10
高峰小时为： 10
15分钟高峰小时系数为：0.96
2019/8/11
高峰小时为： 10
15分钟高峰小时系数为：0.95
2019/8/12
高峰小时为： 6
15分钟高峰小时系数为：0.89
2019/8/13
高峰小时为： 7
15分钟高峰小时系数为：0.93
2019/8/14
高峰小时为： 7
15分钟高峰小时系数为：0.95
2019/8/15
高峰小时为： 7
15分钟高峰小时系数为：0.97
2019/8/16
高峰小时为： 8
15分钟高峰小时系数为：0.97
2019/8/17
高峰小时为： 9
15分钟高峰小时系数为：0.95
2019/8/18
高峰小时为： 16
15分钟高峰小时系数为：0.98
2019/8/19
高峰小时为： 7
15分钟高峰小时系数为：0.95
2019/8/20
高峰小时为： 7
15分钟高峰小时系数为：0.96
2019/8/21
高峰小时为： 7
15分钟高峰小时系数为：0.94
2019/8/22
高峰小时为： 7
15分钟高峰小时系数为：0.98
2019/8/23
高峰小时为： 7
15分钟高峰小时系数为：0.96
2019/8/24
高峰小时为： 10
15分钟高峰小时系数为：0.89
2019/8/25
高峰小时为： 10
15分钟高峰小时系数为：0.93
2019/8/26
高峰小时为： 17
15分钟高峰小时系数为：0.93
2019/8/27
高峰小时为： 7
15分钟高峰小时系数为：0.89
2019/8/28
高峰小时为： 7
15分钟高峰小时系数为