Copyright (c) Microsoft Corporation. All rights reserved.

Licensed under the MIT License.

# Ingest IoT Data
---
The connected cooler sends data regarding products removed from monitored coolers hourly per cooler. This data is recieved by IoTHub and then transformed and recorded in 
the IoTInventoryAction table.  This notebook can be run on a user defined cadance (through [Synapse pipelines](https://docs.microsoft.com/en-us/azure/synapse-analytics/get-started-pipelines) for example) to update the rest of the inventory model with 
this incoming IoT data from the cooler.

## 1.0 Get IoT data

### 1.1 Imports and configuration

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from notebookutils import mssparkutils
import json
sc = spark.sparkContext

synapse_account_name = 'connected-cooler-sa-synapsews'
data_lake_account_name = '<data_lake_account_name>' # Synapse Workspace ADLS
file_system_name = 'connectedcoolersasynfs'
synapse_workspace_name = '<synapse_workspace_name>'
database_name = 'ContosoCoolerDatabase'

spark.conf.set("spark.storage.synapse.linkedServiceName", f"{synapse_account_name}-WorkspaceDefaultStorage")
spark.conf.set("fs.azure.account.oauth.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

### 1.2 Read in data sets

In [None]:
picklist_df = spark.read.parquet(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/{database_name}/picklist')
picklistitem_df = spark.read.parquet(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/{database_name}/picklistitem')
iotinventoryaction_df = spark.sql(f"SELECT * FROM `{database_name}`.`iotinventoryaction`")
#iotinventoryaction_df = spark.read.option('header', 'true').csv(f'abfss://{file_system_name}@{data_lake_account_name}.dfs.core.windows.net/{database_name}/iotinventoryaction3/*.csv')

## 2.0 Create Picklists from IoT data
- This tasks takes the latest data incoming to the IoTInventoryAction table (from the connected Coolers) and creates inventory PickLists.  These picklists will 
be processed to adjust the coolers ballance on hand of the particluar item using code from the inventory_functions notebook.  This code should also be scheduled 
to run in a pipeline to make these adjustments periodically.

### 2.1 Get last picklist id and timesstamp

In [None]:
w = Window.orderBy(desc('PickListId'))
last_picklistdata = picklist_df.withColumn('Rank',dense_rank().over(w)).head()

print(f"id: {last_picklistdata['PickListId']}, ts: {last_picklistdata['PickListFulfilledTimestamp']}")

### 2.2 Get all iot messages after last load date 

In [None]:
iotinventoryaction_update_df = iotinventoryaction_df.filter(to_timestamp('PickTime') > to_timestamp(lit(last_picklistdata['PickListFulfilledTimestamp'])))
display(iotinventoryaction_update_df)

### 2.3 Add index column incrementing from last value

In [None]:
iotinventoryaction_update_df2 = iotinventoryaction_update_df \
    .select(row_number() \
    .over(Window.partitionBy() \
    .orderBy(iotinventoryaction_update_df['PickTime'])) \
    .alias("PickListId"),"PickTime","CoolerId","ItemSku", "Quantity")

iotinventoryaction_update_df3 = iotinventoryaction_update_df2 \
    .withColumn('PickListId', col('PickListId') + last_picklistdata['PickListId']) \
    .withColumn('CoolerId', col('CoolerId') \
    .cast(IntegerType()))

iotinventoryaction_update_df3.show()

### 2.4 Write iot messages to PickList and PickListItem

In [None]:
iotinventoryaction_update_df3 \
    .select('PickListId', col('PickTime') \
    .alias('PickListFufilledTimestamp'), 'CoolerId') \
    .write.insertInto(f"{database_name}.picklist")

iotinventoryaction_update_df3 \
    .select('PickListId','ItemSku',col('Quantity') \
    .alias('ItemQuantity')) \
    .write.insertInto(f"{database_name}.picklistItem")