In [29]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode,col,hash,expr,split,array
from pyspark.sql.types import ArrayType, IntegerType, ShortType


In [30]:
spark = SparkSession.builder.appName('network').config("spark.driver.memory", "4g").getOrCreate()

In [31]:
spark

In [32]:
df_pyspark = spark.read.option('multiline','true').json('rate.json')


                                                                                

In [33]:
df_pyspark.printSchema()

root
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- billing_code_type_version: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negotiated_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- negotiated_prices: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- additional_information: string (nullable = true)
 |    |    |    |    |-- billing_class: string (nullable = true)
 |    |    |    |    |-- billing_code_modifier: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- expiration_date: string (nullable = true)
 |    |    |    |    |-- negotiated_rate: double (nullable = true)
 |    |    |    |    |-- negotiated_type: string (nullable = true)
 |    |    |    |    |-- service_code: array (nullable = true)
 |    |    |

In [34]:
network_group = df_pyspark.withColumn('new_network',explode('negotiated_rates'))
network_agn = network_group.withColumn('newest_network',explode('new_network.negotiated_prices'))

network_again = network_agn.select(
    "billing_code","billing_code_type","negotiation_arrangement",
    col('newest_network.billing_class').alias('billing_class'),
    col('newest_network.negotiated_rate').alias('negotiated_rate'),
    
    col('newest_network.billing_code_modifier').alias('billing_code_modifier'),
    col('newest_network.negotiated_type').alias('negotiated_type'),
    col('newest_network.service_code').alias('service_code'),
    col('new_network.provider_references').alias('provider_group_id')
    
)
# network_again.show()


    
    

In [35]:
network_again.printSchema()

root
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- negotiation_arrangement: string (nullable = true)
 |-- billing_class: string (nullable = true)
 |-- negotiated_rate: double (nullable = true)
 |-- billing_code_modifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- negotiated_type: string (nullable = true)
 |-- service_code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- provider_group_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



### removing the null value from the billing code


In [36]:
removena_bc = network_again.dropna(subset=['billing_code'])

In [37]:
df_cast = removena_bc.withColumn("service_code",col("service_code").cast(ArrayType(IntegerType())))


In [38]:
df_cast.printSchema()

root
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- negotiation_arrangement: string (nullable = true)
 |-- billing_class: string (nullable = true)
 |-- negotiated_rate: double (nullable = true)
 |-- billing_code_modifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- negotiated_type: string (nullable = true)
 |-- service_code: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- provider_group_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [39]:
df_cast.write.parquet('in_net.parquet')

                                                                                