# Importing dataset to Cosmos DB Azure Table API

To install the Azure Cosmos DB API you need to run:

<b>```pip install azure-cosmosdb-table```</b>

In [None]:
!pip install azure-cosmosdb-table

More info:
<br>https://github.com/Azure/azure-cosmos-python
<br>https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-python
<br>Examples:
<br>https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/cosmos-db/table-storage-how-to-use-python.md


<b>NOTE: in order to run this notebook you are expected to have already created a Azure Storage account and a Cosmos DB account provisioned with the Table API.</b>

### Using Pandas to prepare dataset for import  

We need to have pandas for this example, so let's make sure it is installed by running:  
  
<b>```pip install pandas```</b>


In [None]:
!pip install pandas

In [None]:
import pandas as pd

file = '.\\Employee.csv' #change this if the AdventureWorks folder is not on the same location as the notebook.

df = pd.read_csv(file,header=None,sep='\t', encoding="utf-16") #The file is codified in utf-16

#Lets verify the results
print(df.info(verbose=True))
df.head() #Shows Pandas DataFrame

We are missing the column names since the file did not contain the **headers**.
Hence, lets modify the column names as they are mentioned in the original AdventureWorksOLTP setup script.

In [None]:
df.columns = ["BusinessEntityID",
                "NationalIDNumber",
                "LoginID",
                "OrganizationNode",
                "OrganizationLevel",
                "JobTitle",
                "BirthDate",
                "MaritalStatus",
                "Gender",
                "HireDate",
                "SalariedFlag",
                "VacationHours",
                "SickLeaveHours",
                "CurrentFlag",
                "rowguid",
                "ModifiedDate"]

#Checking the results
df.head()

Now lets see a summary of the values to chose the best collumns for **PartitionKey** (look for more unique values in a column) and **RowKey**. We check for data distribution and we need to think what type of queries we are going to do. 

In [None]:
df.describe(include='all')

JobTitle seems to be a good candidate for partition key and LoginID for row key.<br>
Hence, lets replace the column names accordingly.

In [None]:
df.rename(columns={'JobTitle':'PartitionKey','LoginID':'RowKey'}, inplace= True)

#Confirm results
df.head()

Data Cleansing - The "\\" in the login will break JSON serialization, so we are going to replace it with "_"

In [None]:
df.RowKey = df.RowKey.str.replace("\\","_") #backslash may cause some issues in Json
df.head()

OK, data is ready to be imported!

### Connecting to Cosmos D


In [None]:
import azure.cosmosdb.table.tableservice as ats
from azure.cosmosdb.table import Entity, EntityProperty, EdmType

- Connecting to Cosmos DB

In [None]:
#Connecting to Azure Cosmos DB Table 
the_connection_string = "PRIMARY CONNECTION STRING"
ts= ats.TableService(endpoint_suffix = "table.cosmos.azure.com", connection_string= the_connection_string)

### Create table

In [None]:
table='AWEmployees'
if not ts.exists(table):
    ts.create_table(table)

### Loading table

To make things easier we are only importing a few columns. 

We iterate throught the DataFrame and insert rows

In [None]:
for i in range(df.shape[0]-1):
    row=Entity()
    row.PartitionKey=EntityProperty(EdmType.STRING,df.PartitionKey[i])
    row.RowKey=EntityProperty(EdmType.STRING, df.RowKey[i])
    row.MaritalStatus=EntityProperty(EdmType.STRING,df.MaritalStatus[i])
    row.Gender=EntityProperty(EdmType.STRING,df.Gender[i])
    row.HireDate=EntityProperty(EdmType.STRING,df.HireDate[i])
    row.VacationHours=EntityProperty(EdmType.INT32,int(df.VacationHours[i]))
    row.SickLeaveHours = EntityProperty(EdmType.INT32,int(df.SickLeaveHours[i]))
    
    ts.insert_entity(table,row)

### Querying the table

Query one entity

In [None]:
results = ts.query_entities(table, filter="PartitionKey eq 'Chief Executive Officer'")
for r in results:
    print(f"LoginId={r.RowKey}, HireDate= {r.HireDate}")

Query outputs several entities

In [None]:
results = ts.query_entities(table, filter="HireDate gt '2010-01-01' and HireDate lt '2010-12-31'")
for r in results:
    print(f"LoginId = {r.RowKey}, JobTitle = {r.PartitionKey}, HireDate = {r.HireDate}, VacationHours = {r.VacationHours.value}")

### Updating a record

In [None]:
update_row={"PartitionKey":"Senior Tool Designer","RowKey":"adventure-works_ovidiu0","VacationHours":10}
ts.merge_entity(table, entity=update_row,if_match="*") #if using update, we need to pass all properties. Merge only changes the properties provided

The code in this sample is much less than in than the previous example

In [None]:
results = ts.query_entities(table, filter="PartitionKey eq 'Senior Tool Designer' and RowKey eq 'adventure-works_ovidiu0'")
for r in results:
    print(f"LoginId = {r.RowKey}, JobTitle = {r.PartitionKey}, HireDate = {r.HireDate}, VacationHours = {r.VacationHours}")

### Entity Group Transaction  (ETG)
  
Let's simulate an entity group transaction.  
In this case, two employees agreed to trade some vacation time, so 5 hours will be decreased from one and those hours will be added to the other one.  
**This has the same characteristics of a tipical ACID transaction. (Only with entities that live in the same partition)**

In order to accomplish this we need to update the two employees allowances simultaneously through an ETG.  
In this case this is only possible because the two employess share the same role (i.e. have the same PartitionKey).

In [None]:
emp1=ts.get_entity(table,"Production Technician - WC60","adventure-works_maciej0")
emp2=ts.get_entity(table,"Production Technician - WC60","adventure-works_michael7")

print("---INICIAL BALANCE---")
print(f"Employee's '{emp1.RowKey}' Vacation Hours: {emp1.VacationHours.value}")
print(f"Employee's '{emp2.RowKey}' Vacation Hours: {emp2.VacationHours.value}")

In [None]:
emp1.VacationHours.value = emp1.VacationHours.value-5
emp2.VacationHours.value = emp2.VacationHours.value+5

print(f"Employee's '{emp1.RowKey}' new vacation allowance: {emp1.VacationHours.value}")
print(f"Employee's '{emp2.RowKey}' new vacation allowance: {emp2.VacationHours.value}")



The values above are not yet persisted in backend (it only exists in the notebook)

We use **TableBatch** to define the 2 update operation in only one operation

In [None]:
from azure.cosmosdb.table.tablebatch import TableBatch
batch = TableBatch()

batch.update_entity(emp1,if_match='*') #unconditional update
batch.update_entity(emp2,if_match=emp2.etag) #optimistic concurrency

ts.commit_batch(table, batch)

In [None]:
emp1=ts.get_entity(table,"Production Technician - WC60","adventure-works_maciej0")
emp2=ts.get_entity(table,"Production Technician - WC60","adventure-works_michael7")

print("---FINAL BALANCE---")
print(f"Employee's '{emp1.RowKey}' Vacation Hours: {emp1.VacationHours.value}")
print(f"Employee's '{emp2.RowKey}' Vacation Hours: {emp2.VacationHours.value}")

### Delete

In [None]:
ts.delete_entity(table,'Purchasing Assistant','adventure-works_annette0', if_match="*")

results = ts.query_entities(table, filter="PartitionKey eq 'Purchasing Assistant' and RowKey eq 'adventure-works_annette0'")
print(f"Rows returned:{len(results.items)}")

### Cleanup

In [None]:
ts.delete_table(table)