In [1]:
import pyspark.sql.types as t

schema = t.StructType([
    t.StructField('driverID', t.StringType(), False),
    t.StructField('carPlateNumber', t.StringType(), False),
    t.StructField('Latitude', t.DoubleType(), False),
    t.StructField('Longtitude', t.DoubleType(), False),
    t.StructField('Speed', t.IntegerType(), False),
    t.StructField('Direction', t.IntegerType(), True),
    t.StructField('siteName', t.StringType(), True),
    t.StructField('Time', t.TimestampType(), False),
    t.StructField('isRapidlySpeedup', t.ByteType(), True),
    t.StructField('isRapidlySlowdown', t.ByteType(), True),
    t.StructField('isNeutralSlide', t.ByteType(), True),
    t.StructField('isNeutralSlideFinished', t.ByteType(), True),
    t.StructField('neutralSlideTime', t.IntegerType(), True),
    t.StructField('isOverspeed', t.ByteType(), True),
    t.StructField('isOverspeedFinished', t.ByteType(), True),
    t.StructField('overspeedTime', t.IntegerType(), True),
    t.StructField('isFatigueDriving', t.ByteType(), True),
    t.StructField('isHthrottleStop', t.ByteType(), True),
    t.StructField('isOilLeak', t.ByteType(), True),
])

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()

In [12]:
df = spark.read.csv('./detail-records', schema=schema)

df.count()

413450

In [4]:
df.show()

+-------------+--------------+---------+----------+-----+---------+--------+-------------------+----------------+-----------------+--------------+----------------------+----------------+-----------+-------------------+-------------+----------------+---------------+---------+
|     driverID|carPlateNumber| Latitude|Longtitude|Speed|Direction|siteName|               Time|isRapidlySpeedup|isRapidlySlowdown|isNeutralSlide|isNeutralSlideFinished|neutralSlideTime|isOverspeed|isOverspeedFinished|overspeedTime|isFatigueDriving|isHthrottleStop|isOilLeak|
+-------------+--------------+---------+----------+-----+---------+--------+-------------------+----------------+-----------------+--------------+----------------------+----------------+-----------+-------------------+-------------+----------------+---------------+---------+
|haowei1000008|      华A709GB|38.503856|106.209375|  102|       66|    null|2017-01-02 16:00:10|            null|             null|          null|                  null|    

In [5]:
df.createOrReplaceTempView("records")

In [18]:
def dump_summary():
    return spark.sql("""
        SELECT CONCAT(driverID, "+", carPlateNumber) as `Key`,
            COUNT(isOverspeed = 1) as `Total numbers of overspeeding accidents`,
            COUNT(isNeutralSlide = 1) as `Total numbers of neutral slide accidents`,
            COUNT(isFatigueDriving = 1) as `Total numbers of fatigue driving accidents`,
            COUNT(isHthrottleStop = 1) as `Total numbers of H-throttle stop accidents`,
            COUNT(isOilLeak = 1) as `Total numbers of oil leak accidents`,
            COUNT(isRapidlySpeedup = 1) as `Total numbers of rapid speed up accidents`,
            COUNT(isRapidlySlowdown = 1) as `Total numbers of rapid slow down accidents`,
            MAX(overspeedTime) as `Max overspeeding time`,
            SUM(overspeedTime) as `Total overspeeding time`
        FROM records
        GROUP BY driverID, carPlateNumber
    """)

def dump_driver_data_by_date(day: int):
    return spark.sql(f"""
        SELECT driverID, carPlateNumber,
            COUNT(isOverspeed = 1) as `Total numbers of overspeeding accidents`,
            COUNT(isNeutralSlide = 1) as `Total numbers of neutral slide accidents`,
            COUNT(isFatigueDriving = 1) as `Total numbers of fatigue driving accidents`,
            COUNT(isHthrottleStop = 1) as `Total numbers of H-throttle stop accidents`,
            COUNT(isOilLeak = 1) as `Total numbers of oil leak accidents`,
            COUNT(isRapidlySpeedup = 1) as `Total numbers of rapid speed up accidents`,
            COUNT(isRapidlySlowdown = 1) as `Total numbers of rapid slow down accidents`,
            MAX(overspeedTime) as `Max overspeeding time`,
            SUM(overspeedTime) as `Total overspeeding time`
        FROM records
        WHERE to_date(time, "yyyy/mm/dd") = '2017-01-{day}'
        GROUP BY driverID, carPlateNumber
    """)

In [19]:
summary = dump_summary()
summary.toPandas()

Unnamed: 0,Key,Total numbers of overspeeding accidents,Total numbers of neutral slide accidents,Total numbers of fatigue driving accidents,Total numbers of H-throttle stop accidents,Total numbers of oil leak accidents,Total numbers of rapid speed up accidents,Total numbers of rapid slow down accidents,Max overspeeding time,Total overspeeding time
0,zouan1000007+华A58M83,3181,315,3594,389,385,360,385,115,31248
1,duxu1000009+华AT75H8,2301,247,2814,264,248,238,284,94,22338
2,hanhui1000002+华AZI419,3349,327,3997,433,371,401,444,98,31813
3,panxian1000005+华AX542C,3531,330,4307,417,441,395,434,96,33946
4,haowei1000008+华A709GB,2639,255,3204,312,318,321,314,112,25522
5,shenxian1000004+华ADJ750,3126,297,3767,383,366,374,356,123,31494
6,likun1000003+华AVM936,3044,291,3552,347,376,341,354,137,28728
7,zengpeng1000000+华AZQ110,2763,272,3274,284,337,340,344,105,25479
8,xiezhi1000006+华A6CU11,2535,254,2931,312,279,255,310,80,23942
9,xiexiao1000001+华AEB132,2324,248,2720,314,253,264,261,93,23434


In [11]:
day1 = dump_driver_data_by_date(1)
day1.toPandas()

Unnamed: 0,driverID,carPlateNumber,Total numbers of overspeeding accidents,Total numbers of neutral slide accidents,Total numbers of fatigue driving accidents,Total numbers of H-throttle stop accidents,Total numbers of oil leak accidents,Total numbers of rapid speed up accidents,Total numbers of rapid slow down accidents,Max overspeeding time,Total overspeeding time
0,zouan1000007,华A58M83,192,17,224,19,24,17,27,66,1810
1,duxu1000009,华AT75H8,504,49,570,49,47,42,55,76,4836
2,hanhui1000002,华AZI419,467,36,564,59,45,60,54,98,5324
3,panxian1000005,华AX542C,219,22,264,18,30,19,27,59,2194
4,xiexiao1000001,华AEB132,477,53,580,61,47,48,44,90,4866
5,haowei1000008,华A709GB,107,8,113,15,13,8,9,73,895
6,shenxian1000004,华ADJ750,220,25,262,28,31,31,30,65,2217
7,likun1000003,华AVM936,422,39,462,49,45,46,45,83,4080
8,zengpeng1000000,华AZQ110,225,19,283,17,31,30,29,105,2295
9,xiezhi1000006,华A6CU11,404,47,497,54,47,41,39,75,3906
