In [36]:
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import *

In [37]:
biz_date = '20210304'

In [38]:
smartcar_status_info_schema = StructType([
    StructField('reg_date', StringType(), True),
    StructField('car_number', StringType(), True),
    StructField('tire_fl', StringType(), True),
    StructField('tire_fr', StringType(), True),
    StructField('tire_bl', StringType(), True),
    StructField('tire_br', StringType(), True),
    StructField('light_fl', StringType(), True),
    StructField('light_fr', StringType(), True),
    StructField('light_bl', StringType(), True),
    StructField('light_br', StringType(), True),
    StructField('engine', StringType(), True),
    StructField('break', StringType(), True),
    StructField('battery', StringType(), True),
    StructField('biz_date', StringType(), True)
])

In [39]:
smartcar_status_info = spark.read.csv("/tmp/pilot/collect/car-batch-log/wrk_date=20210303/", schema=smartcar_status_info_schema)

In [40]:
#smartcar_status_info = smartcar_status_info.withColumn('biz_date', substring('date', 0, 8))
smartcar_status_info = smartcar_status_info.withColumn("biz_date", lit(biz_date))

In [41]:
smartcar_status_info.show(5)

+--------------+----------+-------+-------+-------+-------+--------+--------+--------+--------+------+-----+-------+--------+
|      reg_date|car_number|tire_fl|tire_fr|tire_bl|tire_br|light_fl|light_fr|light_bl|light_br|engine|break|battery|biz_date|
+--------------+----------+-------+-------+-------+-------+--------+--------+--------+--------+------+-----+-------+--------+
|20210303000000|     N0001|     82|     90|     88|     96|       1|       1|       1|       1|     A|    A|     87|20210303|
|20210303000004|     N0001|     92|     85|     87|     82|       1|       1|       1|       1|     A|    B|     91|20210303|
|20210303000008|     N0001|     95|     97|     92|     87|       1|       1|       1|       1|     A|    A|     93|20210303|
|20210303000012|     N0001|     83|     83|     90|     94|       1|       1|       1|       1|     B|    A|     94|20210303|
|20210303000016|     N0001|     87|     79|     85|     86|       1|       1|       1|       1|     B|    A|     99|20

In [42]:
catalog = ''.join("""{
    "table":{ "namespace": "default", "name": "DriverCarInfo" },
    "rowkey": "key",
    "columns": {
        "key": { "cf": "rowkey", "col": "key", "type": "string" },
        "date": { "cf": "cf1", "col": "date", "type": "string" }, 
        "car_number": { "cf": "cf1", "col": "car_number", "type": "string" }, 
        "speed_pedal": { "cf": "cf1", "col": "speed_pedal", "type": "string" }, 
        "break_pedal": { "cf": "cf1", "col": "break_pedal", "type": "string" }, 
        "steer_angle": { "cf": "cf1", "col": "steer_angle", "type": "string" }, 
        "direct_light": { "cf": "cf1", "col": "direct_light", "type": "string" }, 
        "speed": { "cf": "cf1", "col": "speed", "type": "string" }, 
        "area_number": { "cf": "cf1", "col": "area_number", "type": "string" }
    }
}""".split())

In [43]:
smartcar_drive_info = spark.read \
                           .option("catalog", catalog) \
                           .format('org.apache.spark.sql.execution.datasources.hbase') \
                           .load()

In [44]:
smartcar_drive_info.show(5)

+--------------------+--------------+----------+-----------+-----------+-----------+------------+-----+-----------+
|                 key|          date|car_number|speed_pedal|break_pedal|steer_angle|direct_light|speed|area_number|
+--------------------+--------------+----------+-----------+-----------+-----------+------------+-----+-----------+
|20210303000000-A0001|20210303000000|     A0001|          3|          0|          F|           N|   15|        D04|
|20210303000000-H0002|20210303000000|     H0002|          5|          0|          F|           N|   25|        C10|
|20210303000000-Z0003|20210303000000|     Z0003|          3|          0|          F|           N|   15|        E04|
|20210303000002-A0001|20210303000002|     A0001|          3|          0|         R2|           R|   30|        D09|
|20210303000002-H0002|20210303000002|     H0002|          1|          0|         R1|           R|   30|        C04|
+--------------------+--------------+----------+-----------+-----------+

In [45]:
smartcar_master_schema = StructType([
    StructField("car_number", StringType(), True),
    StructField("sex", StringType(), True),
    StructField("age", StringType(), True),
    StructField("marriage", StringType(), True),
    StructField("region", StringType(), True),
    StructField("job", StringType(), True),
    StructField("car_capacity", StringType(), True),
    StructField("car_year", StringType(), True),
    StructField("car_model", StringType(), True)
])

In [46]:
smartcar_master = spark.read.csv("/tmp/pilot/carmaster.txt", schema=smartcar_master_schema, sep="|")

In [47]:
smartcar_master.show(5)

+----------+---+---+--------+------+--------+------------+--------+---------+
|car_number|sex|age|marriage|region|     job|car_capacity|car_year|car_model|
+----------+---+---+--------+------+--------+------------+--------+---------+
|     A0001| 여| 32|    미혼|  서울|프리랜서|        1000|    2009|        F|
|     A0002| 남| 53|    미혼|  충남|    주부|        2500|    2015|        A|
|     A0003| 여| 62|    기혼|  대전|  회사원|        2500|    2012|        B|
|     A0004| 남| 31|    미혼|  광주|  공무원|        2000|    2010|        D|
|     A0005| 남| 67|    미혼|  대구|  공무원|        1700|    2002|        C|
+----------+---+---+--------+------+--------+------------+--------+---------+
only showing top 5 rows



In [48]:
smartcar_item_buylist_schema = StructType([
    StructField("car_number", StringType(), True),
    StructField("item", StringType(), True),
    StructField("score", StringType(), True),
    StructField("month", StringType(), True)
])

In [49]:
smartcar_item_buylist = spark.read.csv('/tmp/pilot/car_item_buy_list.txt', schema=smartcar_item_buylist_schema)

In [50]:
smartcar_item_buylist.show(5)

+----------+--------+-----+------+
|car_number|    item|score| month|
+----------+--------+-----+------+
|     M0014|Item-018|    2|202103|
|     G0035|Item-015|    3|202103|
|     I0090|Item-009|    3|202103|
|     K0095|Item-018|    5|202103|
|     Y0042|Item-020|    2|202103|
+----------+--------+-----+------+
only showing top 5 rows



In [51]:
smartcar_master.orderBy(col('age').asc()).show(10)

+----------+---+---+--------+------+--------+------------+--------+---------+
|car_number|sex|age|marriage|region|     job|car_capacity|car_year|car_model|
+----------+---+---+--------+------+--------+------------+--------+---------+
|     C0072| 여| 15|    기혼|  대구|  회사원|        1200|    2014|        E|
|     F0044| 여| 15|    기혼|  충남|개인사업|        2000|    2006|        C|
|     A0023| 남| 15|    미혼|  인천|  전문직|        1700|    2014|        C|
|     F0014| 여| 15|    기혼|  경북|개인사업|        1000|    2001|        D|
|     D0057| 여| 15|    미혼|  광주|  공무원|        2500|    2004|        A|
|     B0059| 여| 15|    기혼|  전남|    주부|        2000|    2003|        A|
|     E0069| 남| 15|    미혼|  울산|    주부|        2000|    2002|        F|
|     C0026| 남| 15|    미혼|  경기|  공무원|        1200|    2010|        G|
|     E0071| 여| 15|    기혼|  제주|프리랜서|        1200|    2007|        E|
|     A0074| 여| 15|    기혼|  강원|    학생|        2000|    2003|        D|
+----------+---+---+--------+------+--------+------------+--------

In [52]:
smartcar_master_over18 = smartcar_master.where(col('age') >= 18)

In [53]:
smartcar_master_over18.orderBy(col('age').asc()).show(5)

+----------+---+---+--------+------+--------+------------+--------+---------+
|car_number|sex|age|marriage|region|     job|car_capacity|car_year|car_model|
+----------+---+---+--------+------+--------+------------+--------+---------+
|     A0049| 여| 18|    미혼|  세종|  공무원|        2000|    2014|        E|
|     A0068| 여| 18|    기혼|  부산|  회사원|        2000|    2000|        E|
|     G0036| 남| 18|    기혼|  전북|  전문직|        1700|    2002|        A|
|     E0053| 남| 18|    기혼|  전북|개인사업|        1000|    2007|        B|
|     F0011| 남| 18|    미혼|  대전|  회사원|        3000|    2000|        C|
+----------+---+---+--------+------+--------+------------+--------+---------+
only showing top 5 rows



In [54]:
# 차량 정보와 운전자 정보 결합
managed_smartcar_status_info = smartcar_status_info.where(col('biz_date') == biz_date) \
                                                   .join(smartcar_master_over18, 
                                                         on=['car_number'], 
                                                         how="inner")

In [55]:
managed_smartcar_status_info.show(5)

+----------+--------------+-------+-------+-------+-------+--------+--------+--------+--------+------+-----+-------+--------+---+---+--------+------+------+------------+--------+---------+
|car_number|      reg_date|tire_fl|tire_fr|tire_bl|tire_br|light_fl|light_fr|light_bl|light_br|engine|break|battery|biz_date|sex|age|marriage|region|   job|car_capacity|car_year|car_model|
+----------+--------------+-------+-------+-------+-------+--------+--------+--------+--------+------+-----+-------+--------+---+---+--------+------+------+------------+--------+---------+
|     N0001|20210303000000|     82|     90|     88|     96|       1|       1|       1|       1|     A|    A|     87|20210303| 남| 53|    기혼|  전남|공무원|        2000|    2011|        D|
|     N0001|20210303000004|     92|     85|     87|     82|       1|       1|       1|       1|     A|    B|     91|20210303| 남| 53|    기혼|  전남|공무원|        2000|    2011|        D|
|     N0001|20210303000008|     95|     97|     92|     87|       1|   

In [56]:
smartcar_drive_info = smartcar_drive_info.withColumn('biz_date', substring('date', 0, 8))

In [57]:
managed_smartcar_drive_info = smartcar_drive_info.where(col('biz_date') == biz_date) \
                                                 .join(smartcar_master_over18, on=["car_number"], how="inner") \
                                                 .select('car_number', 'sex', 'age', 'marriage', 'region', 'job', 'car_capacity', 'car_year', 'car_model', 
                                                         'speed_pedal', 'break_pedal', 'steer_angle', 'direct_light', 'speed', 'area_number', 'date', 'biz_date')

In [58]:
managed_smartcar_drive_info.show(5)

+----------+---+---+--------+------+--------+------------+--------+---------+-----------+-----------+-----------+------------+-----+-----------+--------------+--------+
|car_number|sex|age|marriage|region|     job|car_capacity|car_year|car_model|speed_pedal|break_pedal|steer_angle|direct_light|speed|area_number|          date|biz_date|
+----------+---+---+--------+------+--------+------------+--------+---------+-----------+-----------+-----------+------------+-----+-----------+--------------+--------+
|     A0001| 여| 32|    미혼|  서울|프리랜서|        1000|    2009|        F|          3|          0|          F|           N|   15|        D04|20210303000000|20210303|
|     H0002| 여| 49|    미혼|  제주|  전문직|        2500|    2011|        F|          5|          0|          F|           N|   25|        C10|20210303000000|20210303|
|     A0001| 여| 32|    미혼|  서울|프리랜서|        1000|    2009|        F|          3|          0|         R2|           R|   30|        D09|20210303000002|20210303|
|     H0002|

In [59]:
grouped_managed_smartcar_drive_info1 = managed_smartcar_drive_info.where(col('biz_date') == biz_date) \
                                                                  .groupBy(col('car_number'), col('biz_date')) \
                                                                  .agg(avg('speed_pedal').alias('speed_p_avg_by_carnum'), 
                                                                       avg('break_pedal').alias('break_p_avg_by_carnum'))                                                    
    
grouped_managed_smartcar_drive_info2 = managed_smartcar_drive_info.where( (col('biz_date') == biz_date) & (col('steer_angle').isin(['L2', 'L3', 'R2', 'R3']))) \
                                                                  .groupBy(col('car_number')) \
                                                                  .agg(count('car_number').alias('steer_a_count'))
                                                                  
smartcar_drive_info_stats1 = managed_smartcar_drive_info.agg(avg('speed_pedal').alias('speed_p_avg'), 
                                                             avg('break_pedal').alias('break_p_avg'))

smartcar_drive_info_stats2 = managed_smartcar_drive_info.groupBy('car_number') \
                                                        .agg(avg('speed_pedal').alias('speed_p_avg_by_carnum'), 
                                                             avg('break_pedal').alias('break_p_avg_by_carnum')) \
                                                        .agg(stddev_pop('speed_p_avg_by_carnum').alias('speed_p_std'), 
                                                             stddev_pop('break_p_avg_by_carnum').alias('break_p_std'))

In [60]:
grouped_managed_smartcar_drive_info1.show(5)
grouped_managed_smartcar_drive_info2.show(5)

+----------+--------+---------------------+---------------------+
|car_number|biz_date|speed_p_avg_by_carnum|break_p_avg_by_carnum|
+----------+--------+---------------------+---------------------+
|     A0001|20210303|   1.5434108527131782|  0.28888888888888886|
|     H0002|20210303|   1.5500129232359783|    0.302145257172396|
+----------+--------+---------------------+---------------------+

+----------+-------------+
|car_number|steer_a_count|
+----------+-------------+
|     A0001|         3870|
|     H0002|         3869|
+----------+-------------+



In [61]:
smartcar_drive_info_stats1.show()
smartcar_drive_info_stats2.show()

+------------------+-------------------+
|       speed_p_avg|        break_p_avg|
+------------------+-------------------+
|1.5467114614291253|0.29551621656544774|
+------------------+-------------------+

+--------------------+--------------------+
|         speed_p_std|         break_p_std|
+--------------------+--------------------+
|0.003301035261400...|0.006628184141753...|
+--------------------+--------------------+



In [62]:
speed_p_avg, break_p_avg, speed_p_std, break_p_std = smartcar_drive_info_stats1.select('speed_p_avg').first()[0], \
                                                     smartcar_drive_info_stats1.select('break_p_avg').first()[0], \
                                                     smartcar_drive_info_stats2.select('speed_p_std').first()[0], \
                                                     smartcar_drive_info_stats2.select('break_p_std').first()[0]

In [63]:
speed_p_avg, break_p_avg, speed_p_std, break_p_std

(1.5467114614291253,
 0.29551621656544774,
 0.0033010352614000205,
 0.0066281841417535625)

In [64]:
managed_smartcar_symptom_info = grouped_managed_smartcar_drive_info1.join(grouped_managed_smartcar_drive_info2, on=['car_number'], how='inner') \
                                                                    .select('car_number',
                                                                            'speed_p_avg_by_carnum',
                                                                            when( (col('speed_p_avg_by_carnum') - speed_p_avg) / speed_p_std > 2, '비정상' ).otherwise('정상').alias('speed_p_symptom_score'),
                                                                            'break_p_avg_by_carnum',
                                                                            when( (col('break_p_avg_by_carnum') - break_p_avg) / break_p_std > 2, '비정상' ).otherwise('정상').alias('break_p_symptom_score'),
                                                                            'steer_a_count',
                                                                            when( col('steer_a_count') > 1000, '비정상' ).otherwise('정상').alias('steer_p_symptom_score'),
                                                                            'biz_date')

In [65]:
managed_smartcar_symptom_info.show(5)

+----------+---------------------+---------------------+---------------------+---------------------+-------------+---------------------+--------+
|car_number|speed_p_avg_by_carnum|speed_p_symptom_score|break_p_avg_by_carnum|break_p_symptom_score|steer_a_count|steer_p_symptom_score|biz_date|
+----------+---------------------+---------------------+---------------------+---------------------+-------------+---------------------+--------+
|     A0001|   1.5434108527131782|                 정상|  0.28888888888888886|                 정상|         3870|               비정상|20210303|
|     H0002|   1.5500129232359783|                 정상|    0.302145257172396|                 정상|         3869|               비정상|20210303|
+----------+---------------------+---------------------+---------------------+---------------------+-------------+---------------------+--------+



In [66]:
managed_smartcar_emergency_check_info = managed_smartcar_status_info.where(col('biz_date') == biz_date) \
                                                                    .select('car_number', 'biz_date') \
                                                                    .distinct() \
                                                                    .join(managed_smartcar_status_info.where(col('biz_date') == biz_date) \
                                                                                                      .groupBy('car_number') \
                                                                                                      .agg(avg('tire_fl').alias('tire_fl_avg'),
                                                                                                           avg('tire_fr').alias('tire_fr_avg'),
                                                                                                           avg('tire_bl').alias('tire_bl_avg'),
                                                                                                           avg('tire_br').alias('tire_br_avg'),
                                                                                                           lit('타이어 점검').alias('tire_symptom')) \
                                                                                                      .filter( (col('tire_fl_avg') < 80 ) | (col('tire_fr_avg') < 80) | (col('tire_bl_avg') < 80) | (col('tire_br_avg') < 80) ),
                                                                          on=['car_number'],
                                                                          how='left_outer') \
                                                                    .join(managed_smartcar_status_info.where((col('biz_date') == biz_date) & ( (col('light_fl') == '2') | (col('light_fr') == '2') | (col('light_bl') == '2') | (col('light_br') == '2') )) \
                                                                                                      .select('car_number', lit('라이트 점검').alias('light_symptom')) \
                                                                                                      .distinct(),
                                                                         on=['car_number'],
                                                                         how='left_outer') \
                                                                    .join(managed_smartcar_status_info.where((col('biz_date') == biz_date) & (col('engine') == 'C')) \
                                                                                                      .select('car_number', lit('엔진 점검').alias('engine_symptom')) \
                                                                                                      .distinct(),
                                                                         on=['car_number'],
                                                                         how='left_outer') \
                                                                    .join(managed_smartcar_status_info.where((col('biz_date') == biz_date) & (col('break') == 'C')) \
                                                                                                      .select('car_number', lit('브레이크 점검').alias('break_symptom')) \
                                                                                                      .distinct(),
                                                                         on=['car_number'],
                                                                         how='left_outer') \
                                                                    .join(managed_smartcar_status_info.where(col('biz_date') == biz_date) \
                                                                                                      .groupBy('car_number') \
                                                                                                      .agg(avg('battery').alias('battery_avg')) \
                                                                                                      .filter(col('battery_avg') < 30) \
                                                                                                      .select('car_number', 'battery_avg', lit('배터리 점검').alias('battery_symptom')),
                                                                         on=['car_number'],
                                                                         how='left_outer') \
                                                                    .filter((col('tire_symptom') != 'null') | (col('light_symptom') != 'null') | (col('engine_symptom') != 'null') | (col('break_symptom') != 'null') | (col('battery_symptom') != 'null')) \
                                                                    .select('car_number', 'tire_symptom', 'light_symptom', 'engine_symptom', 'break_symptom', 'battery_symptom', 'biz_date')

In [67]:
managed_smartcar_emergency_check_info.show(5)

+----------+------------+-------------+--------------+-------------+---------------+--------+
|car_number|tire_symptom|light_symptom|engine_symptom|break_symptom|battery_symptom|biz_date|
+----------+------------+-------------+--------------+-------------+---------------+--------+
|     N0001|        null|         null|     엔진 점검|         null|           null|20210303|
+----------+------------+-------------+--------------+-------------+---------------+--------+



In [68]:
managed_smartcar_item_buylist_info = smartcar_item_buylist.filter(col('month') == '202103') \
                                                          .join(smartcar_master_over18, 
                                                                on=['car_number'],
                                                                how='inner') \
                                                          .select('car_number', 'sex', 'age', 'marriage', 'region', 'job', 'car_capacity', 'car_year', 'car_model', 'item', 'score', col('month').alias('biz_month'))

In [69]:
managed_smartcar_item_buylist_info.show(5)

+----------+---+---+--------+------+--------+------------+--------+---------+--------+-----+---------+
|car_number|sex|age|marriage|region|     job|car_capacity|car_year|car_model|    item|score|biz_month|
+----------+---+---+--------+------+--------+------------+--------+---------+--------+-----+---------+
|     G0035| 남| 65|    기혼|  제주|  공무원|        1200|    2004|        H|Item-015|    3|   202103|
|     I0090| 남| 40|    기혼|  경남|개인사업|        1000|    2000|        A|Item-009|    3|   202103|
|     K0095| 여| 48|    미혼|  경기|개인사업|        1200|    2008|        A|Item-018|    5|   202103|
|     Y0042| 남| 66|    기혼|  제주|  전문직|        1000|    2015|        A|Item-020|    2|   202103|
|     W0023| 남| 44|    미혼|  대구|  회사원|        1200|    2015|        D|Item-030|    2|   202103|
+----------+---+---+--------+------+--------+------------+--------+---------+--------+-----+---------+
only showing top 5 rows

