In [7]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import explode,col,hash,expr,split,array

In [8]:
spark = SparkSession.builder.appName('network').config("spark.driver.memory", "4g").getOrCreate()

In [9]:
spark

In [10]:
df_pyspark = spark.read.option('multiline','true').json('rate.json')


                                                                                

In [11]:
df_pyspark.printSchema()

root
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- billing_code_type_version: string (nullable = true)
 |-- description: string (nullable = true)
 |-- name: string (nullable = true)
 |-- negotiated_rates: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- negotiated_prices: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- additional_information: string (nullable = true)
 |    |    |    |    |-- billing_class: string (nullable = true)
 |    |    |    |    |-- billing_code_modifier: array (nullable = true)
 |    |    |    |    |    |-- element: string (containsNull = true)
 |    |    |    |    |-- expiration_date: string (nullable = true)
 |    |    |    |    |-- negotiated_rate: double (nullable = true)
 |    |    |    |    |-- negotiated_type: string (nullable = true)
 |    |    |    |    |-- service_code: array (nullable = true)
 |    |    |

In [12]:
network_group = df_pyspark.withColumn('new_network',explode('negotiated_rates'))
network_agn = network_group.withColumn('newest_network',explode('new_network.negotiated_prices'))

network_again = network_agn.select(
    "billing_code","billing_code_type","negotiation_arrangement",
    col('newest_network.billing_class').alias('billing_class'),
    col('newest_network.negotiated_rate').alias('negotiated_rate'),
    
    col('newest_network.billing_code_modifier').alias('billing_code_modifier'),
    col('newest_network.negotiated_type').alias('negotiated_type'),
    col('newest_network.service_code').alias('service_code'),
    col('new_network.provider_references').alias('provider_group_id')
    
)
network_again.show()


    
    

[Stage 3:>                                                          (0 + 1) / 1]

+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|billing_code|billing_code_type|negotiation_arrangement|billing_class|negotiated_rate|billing_code_modifier|negotiated_type|service_code|   provider_group_id|
+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|       PEMG1|              CPT|                    ffs| professional|           70.0|                 NULL|     negotiated|        [11]|[10001001, 100020...|
|       PEMG1|              CPT|                    ffs| professional|           77.0|                 NULL|     negotiated|        [11]|          [10048001]|
|       PEMG1|              CPT|                    ffs| professional|         119.97|                 NULL|     negotiated|        [11]|          [10077001]|
|       PEMG1|              CPT|              

                                                                                

In [13]:
network_again.printSchema()

root
 |-- billing_code: string (nullable = true)
 |-- billing_code_type: string (nullable = true)
 |-- negotiation_arrangement: string (nullable = true)
 |-- billing_class: string (nullable = true)
 |-- negotiated_rate: double (nullable = true)
 |-- billing_code_modifier: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- negotiated_type: string (nullable = true)
 |-- service_code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- provider_group_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



### removing the null value from the billing code


In [14]:
removena_bc = network_again.dropna(subset=['billing_code'])

In [15]:
removena_bc.show()

[Stage 4:>                                                          (0 + 1) / 1]

+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|billing_code|billing_code_type|negotiation_arrangement|billing_class|negotiated_rate|billing_code_modifier|negotiated_type|service_code|   provider_group_id|
+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|       PEMG1|              CPT|                    ffs| professional|           70.0|                 NULL|     negotiated|        [11]|[10001001, 100020...|
|       PEMG1|              CPT|                    ffs| professional|           77.0|                 NULL|     negotiated|        [11]|          [10048001]|
|       PEMG1|              CPT|                    ffs| professional|         119.97|                 NULL|     negotiated|        [11]|          [10077001]|
|       PEMG1|              CPT|              

                                                                                

### Use hashing for service code column

In [16]:
add_has = removena_bc.withColumn('service_code',hash('service_code'))
add_has.show()
                                 
                            

[Stage 5:>                                                          (0 + 1) / 1]

+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|billing_code|billing_code_type|negotiation_arrangement|billing_class|negotiated_rate|billing_code_modifier|negotiated_type|service_code|   provider_group_id|
+------------+-----------------+-----------------------+-------------+---------------+---------------------+---------------+------------+--------------------+
|       PEMG1|              CPT|                    ffs| professional|           70.0|                 NULL|     negotiated| -1726485189|[10001001, 100020...|
|       PEMG1|              CPT|                    ffs| professional|           77.0|                 NULL|     negotiated| -1726485189|          [10048001]|
|       PEMG1|              CPT|                    ffs| professional|         119.97|                 NULL|     negotiated| -1726485189|          [10077001]|
|       PEMG1|              CPT|              

                                                                                

### column renamed

In [17]:
rename_col = add_has.withColumnRenamed('billing_class','bCIs')\
                .withColumnRenamed('billing_code','bC')\
                .withColumnRenamed('billing_code_type','bCT')\
                .withColumnRenamed('negotiated_rate','negR')\
                .withColumnRenamed('negotiated_type','negT')\
                .withColumnRenamed('negotiation_arrangement','negA')\
                .withColumnRenamed('billing_code_modifier','mdH')\
                .withColumnRenamed('service_code','poSH')

In [18]:
rename_col.show()

[Stage 6:>                                                          (0 + 1) / 1]

+-----+---+----+------------+------+----+----------+-----------+--------------------+
|   bC|bCT|negA|        bCIs|  negR| mdH|      negT|       poSH|   provider_group_id|
+-----+---+----+------------+------+----+----------+-----------+--------------------+
|PEMG1|CPT| ffs|professional|  70.0|NULL|negotiated|-1726485189|[10001001, 100020...|
|PEMG1|CPT| ffs|professional|  77.0|NULL|negotiated|-1726485189|          [10048001]|
|PEMG1|CPT| ffs|professional|119.97|NULL|negotiated|-1726485189|          [10077001]|
|PEMG1|CPT| ffs|professional|  94.5|NULL|negotiated|-1726485189|          [10087001]|
|PEMG1|CPT| ffs|professional|141.51|NULL|negotiated|-1726485189|[10152001, 10153001]|
|PEMG1|CPT| ffs|professional|  84.0|NULL|negotiated|-1726485189|[10187001, 10395001]|
|PEMG1|CPT| ffs|professional| 106.4|NULL|negotiated|-1726485189|          [10259001]|
|PEMG1|CPT| ffs|professional| 108.5|NULL|negotiated|-1726485189|          [10286001]|
|PEMG1|CPT| ffs|professional| 105.0|NULL|negotiated|-1

                                                                                

### to verify

In [19]:
 network_again.filter(network_again.billing_code_modifier.isNotNull()).count()


                                                                                

97508

In [20]:
rename_col.printSchema()

root
 |-- bC: string (nullable = true)
 |-- bCT: string (nullable = true)
 |-- negA: string (nullable = true)
 |-- bCIs: string (nullable = true)
 |-- negR: double (nullable = true)
 |-- mdH: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- negT: string (nullable = true)
 |-- poSH: integer (nullable = false)
 |-- provider_group_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [21]:
df_cast = rename_col.withColumn("poSH", array(col("poSH")))


In [22]:
df_cast.printSchema()

root
 |-- bC: string (nullable = true)
 |-- bCT: string (nullable = true)
 |-- negA: string (nullable = true)
 |-- bCIs: string (nullable = true)
 |-- negR: double (nullable = true)
 |-- mdH: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- negT: string (nullable = true)
 |-- poSH: array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- provider_group_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



In [23]:
df_cast.toPandas().to_json('in_net.json', orient='records')

                                                                                