In [1]:
from pyspark.sql.types import * 
from pyspark.sql.functions import *
import json

In [2]:
dbutils.fs.mkdirs("/cognitivo/")

In [3]:
#apos criar o diretorio, copei os arquivos do .zip que joguei na pasta local
#dbfs cp . dbfs:/cognitivo/

In [4]:
%fs ls /cognitivo/

path,name,size
dbfs:/cognitivo/config/,config/,0
dbfs:/cognitivo/data/,data/,0
dbfs:/cognitivo/requirements/,requirements/,0
dbfs:/cognitivo/scripts/,scripts/,0


In [5]:
users_file = '/cognitivo/data/input/users/load.csv'

In [6]:
users_df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(users_file)

In [7]:
users_df.show()

In [8]:
users_df.printSchema()

In [9]:
users_df = users_df.select(col('id'), col('name').alias('email'), col('email').alias('name'), col('phone'), col('address'), col('age'), col('create_date'), col('update_date'))

In [10]:
display(users_df)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


In [11]:
#2. Deduplicação dos dados convertidos: No conjunto de dados convertidos haverão múltiplas entradas para um mesmo registro, variando apenas os valores de alguns dos campos entre elas. Será necessário realizar um processo de deduplicação destes dados, a fim de apenas manter a última entrada de cada registro, usando como referência o id para identificação dos registros duplicados e a data de atualização (update_date) para definição do registro mais recente;

In [12]:
users_df_reference = users_df.groupBy('id', ).agg({'update_date':  'max'} ).withColumnRenamed('max(update_date)', "update_date")

In [13]:
users_df_reference.show()

In [14]:
condicao = [users_df_reference['id'] == users_df['id'], users_df_reference['update_date'] == users_df['update_date']]

In [15]:
result_2_df = users_df.join(users_df_reference, on=condicao)

In [16]:
display(result_2_df)

id,email,name,phone,address,age,create_date,update_date,id.1,update_date.1
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000,2,2018-04-21T20:21:24.364+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000,1,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000,3,2018-05-19T05:08:07.964+0000


In [17]:
#3. Conversão do tipo dos dados deduplicados: No diretório config haverá um arquivo JSON de configuração (types_mapping.json), contendo os nomes dos campos e os respectivos tipos desejados de output. Utilizando esse arquivo como input, realizar um processo de conversão dos tipos dos campos descritos, no conjunto de dados deduplicados;

In [18]:
types_mapping = '/dbfs/cognitivo/config/types_mapping.json'

In [19]:
with open(types_mapping, "r") as json_file:
  types_dict = json.load(json_file)

In [20]:
print(types_dict)

In [21]:
users_df.printSchema()

In [22]:
types_dict['id'] = 'integer'
types_dict['email'] = 'string'
types_dict['name'] = 'string'
types_dict['phone'] = 'string'
types_dict['address'] = 'string'
types_dict['create_date'] = 'timestamp'

In [23]:
print(types_dict)

In [24]:
for col_name in users_df.columns:
  if col_name in types_dict.keys():
    result3_df = users_df.withColumn(col_name, users_df[col_name].cast(types_dict[col_name]))

In [25]:
display(result3_df)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


In [26]:
result3_df.printSchema()

In [27]:
#1. Conversão do formato dos arquivos: Converter o arquivo CSV presente no diretório data/input/users/load.csv, para um formato colunar de alta performance de leitura de sua escolha. Justificar brevemente a escolha do formato;

In [28]:
users_df.write.parquet('/cognitivo/data/output/output.parquet')

In [29]:
%fs ls /cognitivo/data/output/output.parquet


path,name,size
dbfs:/cognitivo/data/output/output.parquet/_SUCCESS,_SUCCESS,0
dbfs:/cognitivo/data/output/output.parquet/_committed_7591499190595469551,_committed_7591499190595469551,125
dbfs:/cognitivo/data/output/output.parquet/_started_7591499190595469551,_started_7591499190595469551,0
dbfs:/cognitivo/data/output/output.parquet/part-00000-tid-7591499190595469551-76ba2699-5ceb-4b4d-968a-4bc5b39e5a4b-2035-1-c000.snappy.parquet,part-00000-tid-7591499190595469551-76ba2699-5ceb-4b4d-968a-4bc5b39e5a4b-2035-1-c000.snappy.parquet,2773


In [30]:
parquet_df = sqlContext.read.parquet('/cognitivo/data/output/output.parquet')

In [31]:
display(parquet_df)

id,email,name,phone,address,age,create_date,update_date
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9997,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-03-03T18:47:01.954+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9998,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-04-14T17:09:48.558+0000
2,sherlock.holmes@cognitivo.ai,Sherlock Holmes,(11) 94815-1623,"221B Baker Street, London, UK",34,2018-04-21T20:21:24.364+0000,2018-04-21T20:21:24.364+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 91234-5678,"124 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T04:07:06.854+0000
1,david.lynch@cognitivo.ai,David Lynch,(11) 99999-9999,"Mulholland Drive, Los Angeles, CA, US",72,2018-03-03T18:47:01.954+0000,2018-05-23T10:13:59.594+0000
3,spongebob.squarepants@cognitivo.ai,Spongebob Squarepants,(11) 98765-4321,"122 Conch Street, Bikini Bottom, Pacific Ocean",13,2018-05-19T04:07:06.854+0000,2018-05-19T05:08:07.964+0000


In [32]:
#Utilizei o parquet, porque, primeiramente é colunar e segundo é suportado por varios sistemas de processamento