## jsonデータ読み込み


In [3]:
account_name = '<ストレージアカウント名>'

In [4]:
datapath = 'abfss://datalake@'+account_name+'.dfs.core.windows.net/SensorDatasets/sensor/*/*/*/'
df = spark.read.json(datapath)
df.printSchema()
df.count()

root
 |-- Cycle: long (nullable = true)
 |-- DeviceId: string (nullable = true)
 |-- EventEnqueuedUtcTime: string (nullable = true)
 |-- EventProcessedUtcTime: string (nullable = true)
 |-- IoTHub: struct (nullable = true)
 |    |-- ConnectionDeviceGenerationId: string (nullable = true)
 |    |-- ConnectionDeviceId: string (nullable = true)
 |    |-- CorrelationId: string (nullable = true)
 |    |-- EnqueuedTime: string (nullable = true)
 |    |-- MessageId: string (nullable = true)
 |    |-- StreamId: string (nullable = true)
 |-- PartitionId: long (nullable = true)
 |-- Period: long (nullable = true)
 |-- Sensor11: double (nullable = true)
 |-- Sensor14: double (nullable = true)
 |-- Sensor15: double (nullable = true)
 |-- Sensor9: double (nullable = true)

1038971

## データ加工

In [5]:
from pyspark.sql.functions import from_utc_timestamp,to_date,hour

#必要に応じてJSTは変更
df_datetime_conv = df.withColumn("timestamp_JST",from_utc_timestamp('EventEnqueuedUtcTime', "JST"))
df_date_conv =df_datetime_conv\
    .withColumn("date_JST",to_date("timestamp_JST"))\
    .withColumn("hour_JST",hour("timestamp_JST"))\
    .drop('IoTHub','PartitionId','EventProcessedUtcTime','EventEnqueuedUtcTime')


In [6]:
# 確認
df_date_conv.show()

+-----+---------+------+--------+--------+--------+-------+--------------------+----------+--------+
|Cycle| DeviceId|Period|Sensor11|Sensor14|Sensor15|Sensor9|       timestamp_JST|  date_JST|hour_JST|
+-----+---------+------+--------+--------+--------+-------+--------------------+----------+--------+
|   68|N3172FJ-2|    49|   44.29| 8061.17|  9.1958|8728.68|2020-04-28 09:00:...|2020-04-28|       9|
|   68|N3172FJ-1|    49|   41.98| 8087.09|  9.3633|8328.02|2020-04-28 09:00:...|2020-04-28|       9|
|   68|N4172FJ-2|    49|   41.94|  8074.2|  9.3415|8306.42|2020-04-28 09:00:...|2020-04-28|       9|
|   68|N4172FJ-1|    49|   45.17| 8132.85|  8.6752|8775.17|2020-04-28 09:00:...|2020-04-28|       9|
|   69|N1172FJ-2|    49|   45.47| 8125.21|  8.6166| 8759.7|2020-04-28 09:00:...|2020-04-28|       9|
|   69|N1172FJ-1|    49|   47.35| 8140.41|   8.398|9058.72|2020-04-28 09:00:...|2020-04-28|       9|
|   69|N2172FJ-2|    49|   41.77| 8092.79|  9.3043|8323.55|2020-04-28 09:00:...|2020-04-28|

In [11]:
from pyspark.sql.functions import row_number,col,when
from pyspark.sql.window import Window

# 連番を付与し、メンテナンスの発生前の最終サイクルにフラグをたてる

df_rownum = df_date_conv\
.withColumn("row_num",row_number()\
.over(Window.partitionBy("Period","DeviceId","date_JST")\
.orderBy(col("Cycle").desc())))

df_curated = df_rownum.withColumn("EndOfPeriod",when(df_rownum.row_num==1,1).otherwise(0))\
    .drop('row_num')

In [12]:
df_curated.show()

+-----+---------+------+--------+--------+--------+-------+--------------------+----------+--------+-----------+
|Cycle| DeviceId|Period|Sensor11|Sensor14|Sensor15|Sensor9|       timestamp_JST|  date_JST|hour_JST|EndOfPeriod|
+-----+---------+------+--------+--------+--------+-------+--------------------+----------+--------+-----------+
|   54|N3172FJ-2|     1|   41.94|  8066.8|  9.3881|8312.53|2020-04-29 08:50:...|2020-04-29|       8|          1|
|   53|N3172FJ-2|     1|   36.99| 7864.04| 10.9041|8001.09|2020-04-29 08:50:...|2020-04-29|       8|          0|
|   52|N3172FJ-2|     1|   36.89|  7864.5| 10.9291| 8004.4|2020-04-29 08:49:...|2020-04-29|       8|          0|
|   51|N3172FJ-2|     1|   42.04| 8052.26|  9.3318|8336.03|2020-04-29 08:49:...|2020-04-29|       8|          0|
|   50|N3172FJ-2|     1|   45.54| 8114.55|   8.649|8772.46|2020-04-29 08:49:...|2020-04-29|       8|          0|
|   49|N3172FJ-2|     1|   45.55| 8115.32|  8.6801|8762.43|2020-04-29 08:49:...|2020-04-29|     

## Notebook上でのビジュアライズ
Chartを選択し、オプションを設定することでグラフを表示可能です

In [13]:
# 確認
display(df_curated)

## Parquetファイルで保存
加工後の結果をファイルで保存することで。DWHへのロード以外の用途での再利用が可能です。

In [14]:
# パス指定
curated_path='abfss://datalake@'+account_name+'.dfs.core.windows.net/curated/sensor_spark/'

# 書き込み
df_curated\
    .write\
    .format('parquet')\
    .mode("overwrite")\
    .save(curated_path)


## 外部SparkTableの作成
ストレージ上のParquetファイルを利用したSparkテーブルを作成します。


In [21]:
spark.sql("""
    DROP TABLE IF EXISTS sensortablespark
  """)
spark.sql("""
    CREATE TABLE sensortablespark
    USING Parquet
    LOCATION '{}'
""".format(curated_path)
)

DataFrame[]

## マジックコマンドによる開発言語選択
セル単位で既定以外の言語選択が可能です。
- %%pyspark →python利用
- %%spark →Scala利用
- %% sql →SQL利用

In [22]:
%%sql
SELECT 
    *
FROM 
    sensortablespark 
    
LIMIT(10)