In [41]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
import json

## Sessão do Spark

In [42]:
# sc = SparkContext()
# spark = SparkSession(sc).builder.appName("Treinamento").getOrCreate()

## Leitura do json

In [43]:
with open('../dataset/data.json') as f:
    json_data = json.dumps(json.load(f))

In [44]:
rddjson = sc.parallelize([json_data])
df = spark.read.json(rddjson)

In [45]:
df.show()

+--------------------+--------------------+------+
|                data|               links|  meta|
+--------------------+--------------------+------+
|[[[[5068536200013...|[https://api.banc...|[1, 1]|
+--------------------+--------------------+------+



## Tratamento da base

In [46]:
df_tratamento = df.select('data.brand.name', 'data.brand.companies')

In [47]:
df_tratamento.show()

+-------------+--------------------+
|         name|           companies|
+-------------+--------------------+
|Organização A|[[50685362000135,...|
+-------------+--------------------+



In [48]:
df_tratamento = df_tratamento.withColumn('companies_exploded', F.explode('companies')).drop('companies')

In [49]:
df_tratamento.show()

+-------------+--------------------+
|         name|  companies_exploded|
+-------------+--------------------+
|Organização A|[50685362000135, ...|
|Organização A|[50685567111135, ...|
+-------------+--------------------+



In [50]:
df_tratamento = df_tratamento.selectExpr(
                        'name as organization',
                        'companies_exploded.name as companie_name',
                        'companies_exploded.cnpjNumber as companie_cnpj',
                        'companies_exploded.urlComplementaryList as companie_urlComplementaryList',
                        'companies_exploded.personalCreditCards as companies_personalCreditCards'
                        )

In [51]:
df_tratamento.show(vertical = True)

-RECORD 0---------------------------------------------
 organization                  | Organização A        
 companie_name                 | Empresa da Organi... 
 companie_cnpj                 | 50685362000135       
 companie_urlComplementaryList | https://empresaa1... 
 companies_personalCreditCards | [[[[[Disponibiliz... 
-RECORD 1---------------------------------------------
 organization                  | Organização A        
 companie_name                 | Empresa da Organi... 
 companie_cnpj                 | 50685567111135       
 companie_urlComplementaryList | https://empresaa2... 
 companies_personalCreditCards | [[[[[Disponibiliz... 



In [52]:
df_tratamento = df_tratamento.withColumn('personalCreditCards_exploded', F.explode('companies_personalCreditCards'))

In [53]:
df_tratamento = df_tratamento.selectExpr(
                        'personalCreditCards_exploded.name as companie_name',
                        'personalCreditCards_exploded.identification.product.type as type_card',
                        'personalCreditCards_exploded.identification.creditCard.network as network',
                        'personalCreditCards_exploded.rewardsProgram.hasRewardProgram',
                        'personalCreditCards_exploded.rewardsProgram.rewardProgramInfo',
                        'personalCreditCards_exploded.fees.services as services',
                        'personalCreditCards_exploded.interest.rates as rates',
                        'personalCreditCards_exploded.interest.instalmentRates as instalmentRates',
                        'personalCreditCards_exploded.interest.otherCredits as otherCredits',
                        'personalCreditCards_exploded.termsConditions.minimumFeeRate as term_minimumFeeRate',
                        'personalCreditCards_exploded.termsConditions.additionalInfo as term_additionalInfo',
                        'personalCreditCards_exploded.termsConditions.elegibilityCriteriaInfo as term_elegibilityCriteriaInfo',
                        'personalCreditCards_exploded.termsConditions.closingProcessInfo as term_closingProcessInfo'

)

In [54]:
df_tratamento.show(truncate = False, vertical = True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 companie_name                | Cartão Universitário                                                                                                                                                                                                                                                                                                                                                                                                       
 type_card                    | CLASSIC_NACIONAL                                                                

In [55]:
df_tratamento = df_tratamento.withColumn('services_exploded', F.explode('services')) \
             .withColumn('rates_exploded', F.explode('rates')) \
             .withColumn('instalmentRates_exploded', F.explode('instalmentRates')) \
             .withColumn('otherCredits_exploded', F.explode('otherCredits')) \
             .drop('services', 'rates', 'instalmentRates', 'otherCredits')

In [56]:
df_tratamento.show(truncate = False, vertical = True)

20.00], [BRL, [0.3500], 2_FAIXA, 35.00], [BRL, [0.2000], 3_FAIXA, 55.00], [BRL, [0.3000], 4_FAIXA, 69.00]]] 
 rates_exploded               | [[[[0.1500], [0.0987], 1_FAIXA], [[0.3500], [0.1600], 2_FAIXA], [[0.2000], [0.3600], 3_FAIXA], [[0.3000], [0.5890], 4_FAIXA]], 0.9000, 0.0845, NA, PRE_FIXADO]                                                                                                                                                                                                                                                           
 instalmentRates_exploded     | [[[[0.1500], [0.0987], 1_FAIXA], [[0.3500], [0.1600], 2_FAIXA], [[0.2000], [0.3600], 3_FAIXA], [[0.3000], [0.5890], 4_FAIXA]], 0.0865, 0.0456, NA, PRE_FIXADO]                                                                                                                                                                                                                                                           
 otherC

In [57]:
df_tratamento = df_tratamento.selectExpr(
                        '*',
                        'services_exploded.name as name_services',
                        'services_exploded.code as code_services',
                        'services_exploded.chargingTriggerInfo as chargingTriggerInfo_services',
                        'services_exploded.prices as prices_services',
                        'services_exploded.minimum.value as value_mini_services',
                        'services_exploded.minimum.currency as currency__mini_services',
                        'services_exploded.maximum.value as value_max_services',
                        'services_exploded.maximum.currency as currency_max_services',
                        'rates_exploded.referentialRateIndexer as referentialRateIndexer_rates',
                        'rates_exploded.rate as rate_rates',
                        'rates_exploded.applications as applications_rates',
                        'rates_exploded.minimumRate as rates_minimumRate',
                        'rates_exploded.maximumRate as rates_maximumRate',
                        'instalmentRates_exploded.referentialRateIndexer as referentialRateIndexer_instalmentRates',
                        'instalmentRates_exploded.rate as rate_instalmentRates',
                        'instalmentRates_exploded.applications as applications_instalmentRates',
                        'instalmentRates_exploded.minimumRate as instalmentRates_minimumRate',
                        'instalmentRates_exploded.maximumRate as instalmentRates_maximumRate',
                        'otherCredits_exploded.code otherCredits'
    
    ).drop('services_exploded', 'rates_exploded', 'instalmentRates_exploded', 'otherCredits_exploded')

In [58]:
df_tratamento.show(vertical = True)

-RECORD 0------------------------------------------------------
 companie_name                          | Cartão Universitário 
 type_card                              | CLASSIC_NACIONAL     
 network                                | VISA                 
 hasRewardProgram                       | true                 
 rewardProgramInfo                      | https://empresaa1... 
 term_minimumFeeRate                    | 0.30                 
 term_additionalInfo                    | NA                   
 term_elegibilityCriteriaInfo           | https://empresaa1... 
 term_closingProcessInfo                | https://empresaa1... 
 name_services                          | ANUIDADE_CARTAO_B... 
 code_services                          | ANUIDADE_NACIONAL    
 chargingTriggerInfo_services           | Disponibilização ... 
 prices_services                        | [[BRL, [0.1500], ... 
 value_mini_services                    | 19.50                
 currency__mini_services                

In [59]:
df_tratamento = df_tratamento.withColumn('prices_services_exploded', F.explode('prices_services')) \
             .withColumn('applications_rates_exploded', F.explode('applications_rates')) \
             .withColumn('applications_instalmentRates_exploded', F.explode('applications_instalmentRates')) \
             .drop('prices_services', 'applications_rates', 'applications_instalmentRates')
             

In [60]:
df_tratamento.show(truncate = False, vertical = True)

                                                                                                                                                     
 network                                | VISA                                                                                                                                                                                             
 hasRewardProgram                       | true                                                                                                                                                                                             
 rewardProgramInfo                      | https://empresaa1.com/credit_cards_rewards                                                                                                                                                       
 term_minimumFeeRate                    | 0.30                                                                                                

In [61]:
df_tratamento = df_tratamento.selectExpr('*',
                        'prices_services_exploded.interval as interval_prices_services',
                        'prices_services_exploded.value as value_prices_services',
                        'prices_services_exploded.currency as currency_prices_services',
                        'prices_services_exploded.customers.rate as rate_customers_prices_services',
                        'applications_rates_exploded.interval as interval_applications_rates',
                        'applications_rates_exploded.indexer.rate as rate_indexer_applications_rates',
                        'applications_rates_exploded.customers.rate as customers_rate_indexer_applications_rates',
                        'applications_instalmentRates_exploded.interval as interval_applications_instalmentRates',
                        'applications_instalmentRates_exploded.indexer.rate as rate_indexer_applications_instalmentRates',
                        'applications_instalmentRates_exploded.customers.rate as customers_indexer_applications_instalmentRates'
                        

).drop('prices_services_exploded', 'applications_rates_exploded', 'applications_instalmentRates_exploded')

In [62]:
df_tratamento.show(truncate = False, vertical = True)

              | 2_FAIXA                                                                                                                                                                                          
 value_prices_services                          | 35.00                                                                                                                                                                                            
 currency_prices_services                       | BRL                                                                                                                                                                                              
 rate_customers_prices_services                 | 0.3500                                                                                                                                                                                           
 interval_applications_rates                    | 1_FAIXA 

In [63]:
df_tratamento.columns

['companie_name',
 'type_card',
 'network',
 'hasRewardProgram',
 'rewardProgramInfo',
 'term_minimumFeeRate',
 'term_additionalInfo',
 'term_elegibilityCriteriaInfo',
 'term_closingProcessInfo',
 'name_services',
 'code_services',
 'chargingTriggerInfo_services',
 'value_mini_services',
 'currency__mini_services',
 'value_max_services',
 'currency_max_services',
 'referentialRateIndexer_rates',
 'rate_rates',
 'rates_minimumRate',
 'rates_maximumRate',
 'referentialRateIndexer_instalmentRates',
 'rate_instalmentRates',
 'instalmentRates_minimumRate',
 'instalmentRates_maximumRate',
 'otherCredits',
 'interval_prices_services',
 'value_prices_services',
 'currency_prices_services',
 'rate_customers_prices_services',
 'interval_applications_rates',
 'rate_indexer_applications_rates',
 'customers_rate_indexer_applications_rates',
 'interval_applications_instalmentRates',
 'rate_indexer_applications_instalmentRates',
 'customers_indexer_applications_instalmentRates']

In [64]:
df_tratamento.repartition(1).write.option("header", "true").option("sep", ";").option("encoding", "UTF-8").mode('overwrite').csv('../dataset/output/Json_tratado.csv')

## Criando estruturas JSON


In [65]:
df_tratamento.show(vertical = True)

      | ANUIDADE_CARTAO_B... 
 code_services                                  | ANUIDADE_NACIONAL    
 chargingTriggerInfo_services                   | Disponibilização ... 
 value_mini_services                            | 19.50                
 currency__mini_services                        | BRL                  
 value_max_services                             | 72.00                
 currency_max_services                          | BRL                  
 referentialRateIndexer_rates                   | PRE_FIXADO           
 rate_rates                                     | NA                   
 rates_minimumRate                              | 0.0845               
 rates_maximumRate                              | 0.9000               
 referentialRateIndexer_instalmentRates         | PRE_FIXADO           
 rate_instalmentRates                           | NA                   
 instalmentRates_minimumRate                    | 0.0456               
 instalmentRates_maximumRate      

In [66]:
df_tratamento.columns

['companie_name',
 'type_card',
 'network',
 'hasRewardProgram',
 'rewardProgramInfo',
 'term_minimumFeeRate',
 'term_additionalInfo',
 'term_elegibilityCriteriaInfo',
 'term_closingProcessInfo',
 'name_services',
 'code_services',
 'chargingTriggerInfo_services',
 'value_mini_services',
 'currency__mini_services',
 'value_max_services',
 'currency_max_services',
 'referentialRateIndexer_rates',
 'rate_rates',
 'rates_minimumRate',
 'rates_maximumRate',
 'referentialRateIndexer_instalmentRates',
 'rate_instalmentRates',
 'instalmentRates_minimumRate',
 'instalmentRates_maximumRate',
 'otherCredits',
 'interval_prices_services',
 'value_prices_services',
 'currency_prices_services',
 'rate_customers_prices_services',
 'interval_applications_rates',
 'rate_indexer_applications_rates',
 'customers_rate_indexer_applications_rates',
 'interval_applications_instalmentRates',
 'rate_indexer_applications_instalmentRates',
 'customers_indexer_applications_instalmentRates']

In [67]:
df_tratamento = df_tratamento.groupBy('companie_name',
 'type_card',
 'network',
 'hasRewardProgram',
 'rewardProgramInfo',
 'term_minimumFeeRate',
 'term_additionalInfo',
 'term_elegibilityCriteriaInfo',
 'term_closingProcessInfo',
 'name_services',
 'code_services',
 'chargingTriggerInfo_services',
 'value_mini_services',
 'currency__mini_services',
 'value_max_services',
 'currency_max_services').agg(F.struct('code_services', 'chargingTriggerInfo_services', 'value_mini_services', 'currency__mini_services', 'value_max_services', 'currency_max_services').alias('code_struct'))



In [68]:
df_tratamento.columns

['companie_name',
 'type_card',
 'network',
 'hasRewardProgram',
 'rewardProgramInfo',
 'term_minimumFeeRate',
 'term_additionalInfo',
 'term_elegibilityCriteriaInfo',
 'term_closingProcessInfo',
 'name_services',
 'code_services',
 'chargingTriggerInfo_services',
 'value_mini_services',
 'currency__mini_services',
 'value_max_services',
 'currency_max_services',
 'code_struct']

In [69]:
df_tratamento.select('code_struct').repartition(1).write.option("encoding", "UTF-8").mode('overwrite').json('../dataset/output/Json_tratado_saida.json')

In [70]:
df_tratamento = df_tratamento.groupBy('companie_name',
 'type_card',
 'network',
 'hasRewardProgram',
 'rewardProgramInfo',
 'term_minimumFeeRate',
 'term_additionalInfo',
 'term_elegibilityCriteriaInfo',
 'term_closingProcessInfo').agg(F.collect_list(F.struct('code_services', 'chargingTriggerInfo_services', 'value_mini_services', 'currency__mini_services', 'value_max_services', 'currency_max_services').alias('code_struct')).alias('services'))

In [73]:
df_tratamento.printSchema()

root
 |-- companie_name: string (nullable = true)
 |-- type_card: string (nullable = true)
 |-- network: string (nullable = true)
 |-- hasRewardProgram: boolean (nullable = true)
 |-- rewardProgramInfo: string (nullable = true)
 |-- term_minimumFeeRate: string (nullable = true)
 |-- term_additionalInfo: string (nullable = true)
 |-- term_elegibilityCriteriaInfo: string (nullable = true)
 |-- term_closingProcessInfo: string (nullable = true)
 |-- services: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- code_services: string (nullable = true)
 |    |    |-- chargingTriggerInfo_services: string (nullable = true)
 |    |    |-- value_mini_services: string (nullable = true)
 |    |    |-- currency__mini_services: string (nullable = true)
 |    |    |-- value_max_services: string (nullable = true)
 |    |    |-- currency_max_services: string (nullable = true)



In [74]:
df_tratamento.select('services').repartition(1).write.option("encoding", "UTF-8").mode('overwrite').json('../dataset/output/Json_lista.json')