### Step #1 - Read The CSV File

In [0]:
storage_account_name = "conteiner name"
storage_account_access_key = "key value"

In [0]:
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

In [0]:
# File location and type
file_location = "wasbs://input@cognitivoaistg.blob.core.windows.net/load.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

id,name,email,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-03-03 18:47:01.954752
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-04-14 17:09:48.558151
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21 20:21:24.364752,2018-04-21 20:21:24.364752
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19 04:07:06.854752,2018-05-19 04:07:06.854752
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03 18:47:01.954752,2018-05-23 10:13:59.594752
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19 04:07:06.854752,2018-05-19 05:08:07.964752


### Step #2 - Defining Schema for data type

Declare the schema.

This is just a list of field names and data types.

In [0]:
# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

csvSchema = StructType([
  StructField("id", IntegerType(), False),
  StructField("email", StringType(), False),
  StructField("name", StringType(), False),
  StructField("phone", StringType(), False),
  StructField("address", StringType(), False),
  StructField("age", IntegerType(), False),
  StructField("create_date", TimestampType(), False),
  StructField("update_date", TimestampType(), False)
])

### Step #3 - Reading data with types mapped

In [0]:
# File location and type
file_location = "wasbs://input@cognitivoaistg.blob.core.windows.net/load.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .schema(csvSchema) \
  .load(file_location)

display(df)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


### Step #4 - Converting to parquet data file

In [0]:
df.write.mode("overwrite").parquet("wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet")

### Step #4 - Reading parquet data format

In [0]:
display(dbutils.fs.ls("wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet"))

path,name,size
wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet/_SUCCESS,_SUCCESS,0
wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet/_committed_215954761722856117,_committed_215954761722856117,123
wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet/_started_215954761722856117,_started_215954761722856117,0
wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet/part-00000-tid-215954761722856117-b91574fa-c82a-48ab-8cb2-11c01bd8d8d8-130-1-c000.snappy.parquet,part-00000-tid-215954761722856117-b91574fa-c82a-48ab-8cb2-11c01bd8d8d8-130-1-c000.snappy.parquet,2808


In [0]:
usersDF = (spark.read
  .parquet("wasbs://output@cognitivoaistg.blob.core.windows.net/load.parquet")
)
display(usersDF)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


### Step #5 - Deduplicating rows

In [0]:
# Order rows
users_orderbyDF = usersDF.orderBy(['id', 'update_date'], ascending=False)
display(users_orderbyDF)

id,email,name,phone,address,age,create_date,update_date
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000


In [0]:
# Drop duplicated 
usersFinalDF = users_orderbyDF.drop_duplicates(subset=['id', 'create_date'])
display(usersFinalDF)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


### Step #6 - Write data output

In [0]:
usersFinalDF.write.mode("overwrite").parquet("wasbs://output@cognitivoaistg.blob.core.windows.net/usersFinal.parquet")

###Step #7 - Read final data output

In [0]:
usersFinalDataDF = (spark.read
  .parquet("wasbs://output@cognitivoaistg.blob.core.windows.net/usersFinal.parquet")
)
display(usersFinalDataDF)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000
