## initial setup

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when

In [2]:
spark = SparkSession.builder.appName('nppes').config("spark.jars", "mysql-connector-j-8.0.31.jar").getOrCreate()

In [3]:
df = spark.read.csv('C:\\Other-Programs\\NPPES\\ORIGINAL_NPPES_Data_Dissemination_December_2022\\npidata_pfile_20050523-20221211.csv',
                    header=True, inferSchema=True)

In [4]:
df

DataFrame[NPI: int, Entity Type Code: int, Replacement NPI: int, Employer Identification Number (EIN): string, Provider Organization Name (Legal Business Name): string, Provider Last Name (Legal Name): string, Provider First Name: string, Provider Middle Name: string, Provider Name Prefix Text: string, Provider Name Suffix Text: string, Provider Credential Text: string, Provider Other Organization Name: string, Provider Other Organization Name Type Code: int, Provider Other Last Name: string, Provider Other First Name: string, Provider Other Middle Name: string, Provider Other Name Prefix Text: string, Provider Other Name Suffix Text: string, Provider Other Credential Text: string, Provider Other Last Name Type Code: int, Provider First Line Business Mailing Address: string, Provider Second Line Business Mailing Address: string, Provider Business Mailing Address City Name: string, Provider Business Mailing Address State Name: string, Provider Business Mailing Address Postal Code: strin

In [5]:
df.show()

+----------+----------------+---------------+------------------------------------+------------------------------------------------+-------------------------------+-------------------+--------------------+-------------------------+-------------------------+------------------------+--------------------------------+------------------------------------------+------------------------+-------------------------+--------------------------+-------------------------------+-------------------------------+------------------------------+----------------------------------+--------------------------------------------+---------------------------------------------+-------------------------------------------+--------------------------------------------+---------------------------------------------+----------------------------------------------------------------+--------------------------------------------------+--------------------------------------------+------------------------------------------------

In [6]:
df.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- Entity Type Code: integer (nullable = true)
 |-- Replacement NPI: integer (nullable = true)
 |-- Employer Identification Number (EIN): string (nullable = true)
 |-- Provider Organization Name (Legal Business Name): string (nullable = true)
 |-- Provider Last Name (Legal Name): string (nullable = true)
 |-- Provider First Name: string (nullable = true)
 |-- Provider Middle Name: string (nullable = true)
 |-- Provider Name Prefix Text: string (nullable = true)
 |-- Provider Name Suffix Text: string (nullable = true)
 |-- Provider Credential Text: string (nullable = true)
 |-- Provider Other Organization Name: string (nullable = true)
 |-- Provider Other Organization Name Type Code: integer (nullable = true)
 |-- Provider Other Last Name: string (nullable = true)
 |-- Provider Other First Name: string (nullable = true)
 |-- Provider Other Middle Name: string (nullable = true)
 |-- Provider Other Name Prefix Text: string (nullable = true)
 |-- 

In [7]:
df = df \
.withColumnRenamed("Employer Identification Number (EIN)", "EIN") \
.withColumnRenamed("Provider Last Name (Legal Name)", "Provider Last Name") \
.withColumnRenamed("Provider Business Mailing Address Country Code (If outside U.S.)","Provider Business Mailing Address Country Code") \
.withColumnRenamed("Provider Business Practice Location Address Country Code (If outside U.S.)","Provider Business Practice Location Address Country Code")

df.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- Entity Type Code: integer (nullable = true)
 |-- Replacement NPI: integer (nullable = true)
 |-- EIN: string (nullable = true)
 |-- Provider Organization Name (Legal Business Name): string (nullable = true)
 |-- Provider Last Name: string (nullable = true)
 |-- Provider First Name: string (nullable = true)
 |-- Provider Middle Name: string (nullable = true)
 |-- Provider Name Prefix Text: string (nullable = true)
 |-- Provider Name Suffix Text: string (nullable = true)
 |-- Provider Credential Text: string (nullable = true)
 |-- Provider Other Organization Name: string (nullable = true)
 |-- Provider Other Organization Name Type Code: integer (nullable = true)
 |-- Provider Other Last Name: string (nullable = true)
 |-- Provider Other First Name: string (nullable = true)
 |-- Provider Other Middle Name: string (nullable = true)
 |-- Provider Other Name Prefix Text: string (nullable = true)
 |-- Provider Other Name Suffix Text: string (nulla

In [8]:
df = df.select([col(c).alias(c.replace(' ', '_')) for c in df.columns])

df.printSchema()

root
 |-- NPI: integer (nullable = true)
 |-- Entity_Type_Code: integer (nullable = true)
 |-- Replacement_NPI: integer (nullable = true)
 |-- EIN: string (nullable = true)
 |-- Provider_Organization_Name_(Legal_Business_Name): string (nullable = true)
 |-- Provider_Last_Name: string (nullable = true)
 |-- Provider_First_Name: string (nullable = true)
 |-- Provider_Middle_Name: string (nullable = true)
 |-- Provider_Name_Prefix_Text: string (nullable = true)
 |-- Provider_Name_Suffix_Text: string (nullable = true)
 |-- Provider_Credential_Text: string (nullable = true)
 |-- Provider_Other_Organization_Name: string (nullable = true)
 |-- Provider_Other_Organization_Name_Type_Code: integer (nullable = true)
 |-- Provider_Other_Last_Name: string (nullable = true)
 |-- Provider_Other_First_Name: string (nullable = true)
 |-- Provider_Other_Middle_Name: string (nullable = true)
 |-- Provider_Other_Name_Prefix_Text: string (nullable = true)
 |-- Provider_Other_Name_Suffix_Text: string (nulla

## formatting data

In [9]:
df2 = df.alias('df2')

In [16]:
df2.count()

7592843

In [11]:
df2.show()

+----------+----------------+---------------+----+------------------------------------------------+------------------+-------------------+--------------------+-------------------------+-------------------------+------------------------+--------------------------------+------------------------------------------+------------------------+-------------------------+--------------------------+-------------------------------+-------------------------------+------------------------------+----------------------------------+--------------------------------------------+---------------------------------------------+-------------------------------------------+--------------------------------------------+---------------------------------------------+----------------------------------------------+--------------------------------------------------+--------------------------------------------+------------------------------------------------------+-------------------------------------------------------+

In [25]:
df2 = df2.where(
    col("Provider_License_Number_1").isNotNull() |
    col("Provider_License_Number_2").isNotNull() |
    col("Provider_License_Number_3").isNotNull() |
    col("Provider_License_Number_4").isNotNull() |
    col("Provider_License_Number_5").isNotNull() |
    col("Provider_License_Number_6").isNotNull() |
    col("Provider_License_Number_7").isNotNull() |
    col("Provider_License_Number_8").isNotNull() |
    col("Provider_License_Number_9").isNotNull() |
    col("Provider_License_Number_10").isNotNull() |
    col("Provider_License_Number_11").isNotNull() |
    col("Provider_License_Number_12").isNotNull() |
    col("Provider_License_Number_13").isNotNull() |
    col("Provider_License_Number_14").isNotNull() |
    col("Provider_License_Number_15").isNotNull() |
    col("Other_Provider_Identifier_Issuer_1").isNotNull() |
    col("Other_Provider_Identifier_Issuer_2").isNotNull() |
    col("Other_Provider_Identifier_Issuer_3").isNotNull() |
    col("Other_Provider_Identifier_Issuer_4").isNotNull() |
    col("Other_Provider_Identifier_Issuer_5").isNotNull() |
    col("Other_Provider_Identifier_Issuer_6").isNotNull() |
    col("Other_Provider_Identifier_Issuer_7").isNotNull() |
    col("Other_Provider_Identifier_Issuer_8").isNotNull() |
    col("Other_Provider_Identifier_Issuer_9").isNotNull() |
    col("Other_Provider_Identifier_Issuer_10").isNotNull() |
    col("Other_Provider_Identifier_Issuer_11").isNotNull() |
    col("Other_Provider_Identifier_Issuer_12").isNotNull() |
    col("Other_Provider_Identifier_Issuer_13").isNotNull() |
    col("Other_Provider_Identifier_Issuer_14").isNotNull() |
    col("Other_Provider_Identifier_Issuer_15").isNotNull() |
    col("Other_Provider_Identifier_Issuer_16").isNotNull() |
    col("Other_Provider_Identifier_Issuer_17").isNotNull() |
    col("Other_Provider_Identifier_Issuer_18").isNotNull() |
    col("Other_Provider_Identifier_Issuer_19").isNotNull() |  
    col("Other_Provider_Identifier_Issuer_20").isNotNull() |
    col("Other_Provider_Identifier_Issuer_21").isNotNull() |
    col("Other_Provider_Identifier_Issuer_22").isNotNull() |
    col("Other_Provider_Identifier_Issuer_23").isNotNull() |
    col("Other_Provider_Identifier_Issuer_24").isNotNull() |
    col("Other_Provider_Identifier_Issuer_25").isNotNull() |
    col("Other_Provider_Identifier_Issuer_26").isNotNull() |
    col("Other_Provider_Identifier_Issuer_27").isNotNull() |
    col("Other_Provider_Identifier_Issuer_28").isNotNull() |
    col("Other_Provider_Identifier_Issuer_29").isNotNull() |   
    col("Other_Provider_Identifier_Issuer_30").isNotNull() |
    col("Other_Provider_Identifier_Issuer_31").isNotNull() |
    col("Other_Provider_Identifier_Issuer_32").isNotNull() |
    col("Other_Provider_Identifier_Issuer_33").isNotNull() |
    col("Other_Provider_Identifier_Issuer_34").isNotNull() |
    col("Other_Provider_Identifier_Issuer_35").isNotNull() |
    col("Other_Provider_Identifier_Issuer_36").isNotNull() |
    col("Other_Provider_Identifier_Issuer_37").isNotNull() |
    col("Other_Provider_Identifier_Issuer_38").isNotNull() |
    col("Other_Provider_Identifier_Issuer_39").isNotNull() | 
    col("Other_Provider_Identifier_Issuer_40").isNotNull() |
    col("Other_Provider_Identifier_Issuer_41").isNotNull() |
    col("Other_Provider_Identifier_Issuer_42").isNotNull() |
    col("Other_Provider_Identifier_Issuer_43").isNotNull() |
    col("Other_Provider_Identifier_Issuer_44").isNotNull() |
    col("Other_Provider_Identifier_Issuer_45").isNotNull() |
    col("Other_Provider_Identifier_Issuer_46").isNotNull() |
    col("Other_Provider_Identifier_Issuer_47").isNotNull() |
    col("Other_Provider_Identifier_Issuer_48").isNotNull() |
    col("Other_Provider_Identifier_Issuer_49").isNotNull() | 
    col("Other_Provider_Identifier_Issuer_50").isNotNull()
         )

In [26]:
df2.count()

5247362

## dump to db

In [None]:
df.write \
  .mode("overwrite") \
  .format("jdbc") \
  .option("driver","com.mysql.jdbc.Driver") \
  .option("url", "jdbc:mysql://localhost:3306/nppes") \
  .option("dbtable", "npidata_pfile_20050523_20221211_2") \
  .option("user", "admin") \
  .option("password", "admin") \
  .save()