In [1]:
# 查看当前挂载的数据集目录
!ls /home/kesci/input/

province2725  ticket3735


In [2]:
# 查看个人持久化工作区文件
!ls /home/kesci/work/

judge.csv  lstation.csv  spark-warehouse  station_count.csv


In [3]:
# 查看当前kernel下的package
#!pip list --format=columns

In [4]:
# 显示cell运行时长
%load_ext klab-autotime

In [5]:
import pandas as pd
import os,psutil 
import numpy as np
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql import functions

sc = SparkContext()
spark = SparkSession(sc)

time: 3.92 s


## 读取数据集

In [6]:
df = spark.read.csv('/home/kesci/input/ticket3735/ticket.csv')
df.show(10) #显示前十行数据
df.count() 

+----+--------+------+----+------------------+----------+----+----+
| _c0|     _c1|   _c2| _c3|               _c4|       _c5| _c6| _c7|
+----+--------+------+----+------------------+----------+----+----+
|null|    null|  null|null|              null|      null|null|null|
|null|    null|  null|   9|              null|      null|null|   9|
|null|    null|  null|   9|              null|      null|null|   9|
|8009|20200222|110000|   1|140225199002140040|大同汽车站|浑源|   1|
|8009|20200222|110000|   1|140225199002140040|大同汽车站|浑源|   1|
|8006|20200224| 92000|   1|140225198309220016|大同汽车站|浑源|   1|
|8006|20200224| 92000|   1|140225198309220016|大同汽车站|浑源|   1|
|8010|20200225|104000|   1|140225199206064318|大同汽车站|浑源|   1|
|8010|20200225|104000|   1|140225199306034319|大同汽车站|浑源|   1|
|8010|20200225|104000|   1|140225199206064318|大同汽车站|浑源|   1|
+----+--------+------+----+------------------+----------+----+----+
only showing top 10 rows



107226143

time: 31.4 s


## 数据整体预处理

In [7]:
#筛选列
df = df.drop('_c3').drop('_c4')
#对列重命名
df = df.select(col("_c0").alias("ScheduleCode"), col("_c1").alias("DepartDate"), 
               col("_c2").alias("DepartTime"),col("_c5").alias("StartStationName"),
               col("_c6").alias("ReachStationName"),col("_c7").alias("SeatType"))


time: 39.4 ms


In [8]:
df.show(10) #显示前十行数据

+------------+----------+----------+----------------+----------------+--------+
|ScheduleCode|DepartDate|DepartTime|StartStationName|ReachStationName|SeatType|
+------------+----------+----------+----------------+----------------+--------+
|        null|      null|      null|            null|            null|    null|
|        null|      null|      null|            null|            null|       9|
|        null|      null|      null|            null|            null|       9|
|        8009|  20200222|    110000|      大同汽车站|            浑源|       1|
|        8009|  20200222|    110000|      大同汽车站|            浑源|       1|
|        8006|  20200224|     92000|      大同汽车站|            浑源|       1|
|        8006|  20200224|     92000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
+--------

In [9]:
#查看每一列缺失值的比例
df.agg(*[(1-(F.count(c) /F.count('*'))).alias(c+'_missing') for c in df.columns]).show()

+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------+
|ScheduleCode_missing|  DepartDate_missing|  DepartTime_missing|StartStationName_missing|ReachStationName_missing|    SeatType_missing|
+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------+
|1.296325654487340...|3.543911861081383...|3.646498783371754...|    9.540583773448574E-6|    9.382040348171472E-6|2.512446987856798E-5|
+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------+

time: 1min 17s


In [10]:
#去除完全重复的数据
#df = df.drop_duplicates() 

#删除缺失值
#df = df.dropna()
df = df.filter(df.ScheduleCode != 'N')
df = df.filter(df.DepartDate != 'N')
df = df.filter(df.DepartTime != 'N')
df = df.filter(df.StartStationName != 'N')
df = df.filter(df.ReachStationName != 'N')
df = df.filter(df.SeatType != 'N')
#展示前十行数据
df.show(10) 

+------------+----------+----------+----------------+----------------+--------+
|ScheduleCode|DepartDate|DepartTime|StartStationName|ReachStationName|SeatType|
+------------+----------+----------+----------------+----------------+--------+
|        8009|  20200222|    110000|      大同汽车站|            浑源|       1|
|        8009|  20200222|    110000|      大同汽车站|            浑源|       1|
|        8006|  20200224|     92000|      大同汽车站|            浑源|       1|
|        8006|  20200224|     92000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8010|  20200225|    104000|      大同汽车站|            浑源|       1|
|        8006|  20200226|     92000|      大同汽车站|            浑源|       1|
|        8010|  20200226|    104000|      大同汽车站|            浑源|       1|
+------------+----------+-----

In [11]:
df.printSchema()

root
 |-- ScheduleCode: string (nullable = true)
 |-- DepartDate: string (nullable = true)
 |-- DepartTime: string (nullable = true)
 |-- StartStationName: string (nullable = true)
 |-- ReachStationName: string (nullable = true)
 |-- SeatType: string (nullable = true)

time: 2.37 ms


### 班次代码预处理

In [12]:
banci = df.select('ScheduleCode') #选择班次代码所对应的列
schedule = banci.groupby('ScheduleCode').count() #将不同班次代码分组统计
schedule.show(10) #查看前10个班次代码及频次

+------------+-----+
|ScheduleCode|count|
+------------+-----+
|        2136| 1962|
|        1159| 3667|
|        2294|   18|
|        7762|   24|
|        3959|   18|
|         829| 3951|
|        8304| 1422|
|        6613|  367|
|        5325|   16|
|        2162| 1527|
+------------+-----+
only showing top 10 rows

time: 1min 24s


### 发车日期预处理

In [23]:
#提取日期数据列
date = df1.select('DepartDate')
date.show(5)

+----------+
|DepartDate|
+----------+
|  20200222|
|  20200222|
|  20200224|
|  20200224|
|  20200225|
+----------+
only showing top 5 rows

time: 366 ms


In [24]:
from pyspark.sql.functions import when,udf
from pyspark.sql.types import StringType, DateType, IntegerType

time: 1.54 ms


In [25]:
#切分日期数据
#添加新列年
def splityear(x):
    return int(x[0:4])
splityear_udf_str = udf(lambda z:splityear(z),StringType())
df1 = df1.withColumn('year',splityear_udf_str(df['DepartDate']))
df1.show(5)

+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+
|StartStationName|ScheduleCode|DepartDate|DepartTime|ReachStationName|SeatType|county|city|province|year|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|2020|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+
only showing top 5 rows

time: 1.33 s


In [26]:
#添加新列月份
def splitmonth(x):
    return int(x[4:6])
splitmonth_udf_str = udf(lambda z:splitmonth(z),StringType())
df1 = df1.withColumn('month',splitmonth_udf_str(df1['DepartDate']))
df1.show(5) 

+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+
|StartStationName|ScheduleCode|DepartDate|DepartTime|ReachStationName|SeatType|county|city|province|year|month|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2|
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|2020|    2|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+
only showing top 5 rows

time: 525 ms


In [27]:
#添加新列天
def splitday(x):
    return int(x[6:8])
splitday_udf_str = udf(lambda z:splitday(z),StringType())
df1 = df1.withColumn('day',splitday_udf_str(df1['DepartDate'])) 
df1.show(5)

+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+---+
|StartStationName|ScheduleCode|DepartDate|DepartTime|ReachStationName|SeatType|county|city|province|year|month|day|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+---+
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 22|
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 22|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 24|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 24|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 25|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+---+
only showing top 5 rows

t

### 发车时间预处理

In [28]:
# 先将带有一位小数的字符串的小数去掉
def timemix(time):
    time = str(time)
    if ".0" in time:
        time = str(int(float(time)))
    return(time)
departtime_correct = udf(lambda z:timemix(z),StringType())
df1 =  df1.withColumn("DepartTime_correct", departtime_correct(df1['DepartTime']))

time: 23.5 ms


In [29]:
# 定义时间判断函数,3位的无法判断时间，且占比很低，故删除
def iftime(time):
    result1 = 0
    if time is None:
        return(result1)
    time = time.strip()
    if len(time) == 6:
        if int(time[0:2]) < 24:
            if int(time[2:4]) < 60:
                result1 = 1
    if len(time) == 5:
        if int(time[1:3]) < 60:
            result1 = 1
    return(result1)

time: 2.03 ms


In [30]:
df1 =  df1.withColumn("IfTime", functions.UserDefinedFunction(iftime)(df1['DepartTime_correct']))

time: 17.1 ms


In [31]:
# 定义函数提取小时
def gethour(time):
    if iftime(time):
        if len(time) == 6:
            hour =  int(time[0:2])
            return(hour)
        if len(time) == 5:
            hour =  int(time[0:1])
            return(hour)
    else:
        return None

time: 2.36 ms


In [32]:
df1 = df1.withColumn("DepartHour", functions.UserDefinedFunction(gethour)(df1['DepartTime_correct']))

time: 20.6 ms


In [33]:
df1 = df1.drop('DepartTime_correct','IfTime')
df1 = df1.filter(df1.DepartHour != 'N')
df1.show(10) 

+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+---+----------+
|StartStationName|ScheduleCode|DepartDate|DepartTime|ReachStationName|SeatType|county|city|province|year|month|day|DepartHour|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+----+-----+---+----------+
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 22|        11|
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 22|        11|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 24|         9|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 24|         9|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|2020|    2| 25|        10|
|      大同汽车站|        8010|  20200225|    104000|      

### 乘车站预处理

In [13]:
SSN = df.select('StartStationName') #选择出发车站所对应的列
group = SSN.groupby('StartStationName').count() #将不同车站分组统计
group.show(10) #查看前10个车站及频次
group.count() #一共有4677个出发车站

+------------------+------+
|  StartStationName| count|
+------------------+------+
|      太原汽车总站| 10646|
|            锦州站|221000|
|白城市中心客运总站| 20257|
|    松原公路客运站| 17082|
|         231123002|     8|
|          万达广场|    91|
|  建州汽车城公交站|     1|
|              黄石| 37072|
|        芷江汽车站| 10660|
|    仪陇马鞍汽车站| 16144|
+------------------+------+
only showing top 10 rows



4567

time: 2min 42s


In [14]:
ssn = group.toPandas()

time: 1min 22s


In [15]:
ssn.head()

Unnamed: 0,StartStationName,count
0,太原汽车总站,10646
1,锦州站,221000
2,白城市中心客运总站,20257
3,松原公路客运站,17082
4,231123002,8


time: 15.1 ms


In [16]:
ssn.to_csv('/home/kesci/work/station_count.csv',encoding='utf_8_sig')

time: 35.2 ms


In [17]:
region=pd.read_csv('/home/kesci/input/province2725/行政区划.csv')  #读取行政区划表（另附）
judge = group.select('StartStationName').dropna().toPandas() #将不重复的4677个车站另存为pandas数据框
judge['county']= None
judge['city'] = None
judge['province'] = None

time: 1min 21s


In [18]:
judge.describe()

Unnamed: 0,StartStationName,county,city,province
count,4567,0.0,0.0,0.0
unique,4567,0.0,0.0,0.0
top,苏尼特右旗汽车站,,,
freq,1,,,


time: 29.2 ms


In [19]:
## 有的车站名含有县名，有的只有地级市名，所以共匹配两次。
## 首先精确到county
for i in range(len(judge)):
    for j in range(len(region)):
        if region['county_short'].iloc[j] in judge['StartStationName'].iloc[i]:
            judge['county'][i] = region['county'].iloc[j]
            judge['city'][i] = region['city_short'].iloc[j]
            judge['province'][i] = region['province_short'].iloc[j]
            continue
## 没有county的精确到city
for i in range(len(judge)):
    for j in range(len(region)):
        if judge['county'].iloc[i] == None:
            if region['city_short'].iloc[j] in judge['StartStationName'].iloc[i]:
                judge['city'][i] = region['city_short'].iloc[j]
                judge['province'][i] = region['province_short'].iloc[j]
                continue
## 保存judge文件,judge文件包含了4796个车站及其对应的县、地级市、省。
judge.to_csv('/home/kesci/work/judge.csv',encoding='utf_8_sig')

time: 10min 24s


In [20]:
judge.describe()#3795个车站可以查到省份

Unnamed: 0,StartStationName,county,city,province
count,4567,2971,3700,3700
unique,4567,1580,325,30
top,苏尼特右旗汽车站,文昌市,儋州,广东
freq,1,40,238,425


time: 28 ms


In [21]:
#匹配出发站对应的省市县
judge = spark.read.csv('/home/kesci/work/judge.csv',header=True, inferSchema=True)
#judge.show()
judge=judge.drop('_c0')
df1=df.join(judge,'StartStationName') 

time: 519 ms


In [22]:
df1.show()

+----------------+------------+----------+----------+----------------+--------+------+----+--------+
|StartStationName|ScheduleCode|DepartDate|DepartTime|ReachStationName|SeatType|county|city|province|
+----------------+------------+----------+----------+----------------+--------+------+----+--------+
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8009|  20200222|    110000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8006|  20200224|     92000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|
|      大同汽车站|        8010|  20200225|    104000|            浑源|       1|大同区|大庆|    黑龙|
|

### 到达站处理

In [23]:
RSN = df1.select('ReachStationName') #选择到达车站所对应的列
group_ = RSN.groupby('ReachStationName').count() #将不同车站分组统计
group_.show(10) #查看前10个车站及频次
group_.count() 

+----------------+-----+
|ReachStationName|count|
+----------------+-----+
|            阜阳|20266|
|            铜陵| 3351|
|          西坑坑|    6|
|          西三家|  635|
|    敖包哈拉乌苏|   41|
|          三家村|   21|
|          义隆永|   30|
|            嘎查|   24|
|        阿尔本格|   12|
|        哈布其拉|    1|
+----------------+-----+
only showing top 10 rows



50798

time: 2min 58s


In [24]:
judge_ = group_.select('ReachStationName').dropna().toPandas() 
judge_['county']= None
judge_['city'] = None
judge_['province'] = None

time: 1min 29s


In [25]:
for i in range(len(judge_)):
    for j in range(len(region)):
        if region['county_short'].iloc[j] in judge_['ReachStationName'].iloc[i]:
            judge_['county'][i] = region['county_short'].iloc[j]
            judge_['city'][i] = region['city_short'].iloc[j]
            judge_['province'][i] = region['province_short'].iloc[j]
            continue

for i in range(len(judge_)):
    for j in range(len(region)):
        if judge_['county'].iloc[i] == None:
            if region['city_short'].iloc[j] in judge_['ReachStationName'].iloc[i]:
                judge_['city'][i] = region['city_short'].iloc[j]
                judge_['province'][i] = region['province_short'].iloc[j]
                continue

judge_.to_csv('/home/kesci/work/judge_.csv',encoding='utf_8_sig')

KeyboardInterrupt: 

time: 1h 58min


In [None]:
#匹配到达站对应的省市县
judge_ = spark.read.csv('/home/kesci/work/judge_.csv',header=True, inferSchema=True)

judge_=judge_.drop('_c0')
df2=df1.join(judge_,'ReachStationName') 

### 座位类型处理

In [12]:
seat = df.select('SeatType') #选择到达车站所对应的列
seat_type = seat.groupby('SeatType').count() #将不同车站分组统计
seat_type.show(10) #查看前10个车站及频次