# Python ETL Notebook

In [1]:
# Load the api file path to excel
api_file_path = "/lakehouse/default/Files/Vaccination New Files/Combined Data.xlsx"

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 3, Finished, Available, Finished)

In [2]:
import pandas as pd

# Load Excel file using pandas
who_regions = pd.read_excel(api_file_path, sheet_name= "Country and WHO Region Details")
who_regions.head()

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 4, Finished, Available, Finished)

Unnamed: 0,ISO Codes,Country,Region Number,Region Catrgory,all.region,all.sub-region,all.intermediate-region,WHO Regions
0,AFG,Afghanistan,7.0,D,Asia,Southern Asia,,EMR
1,ABW,Aruba,,,Americas,Latin America and the Caribbean,Caribbean,AMR
2,AGO,Angola,1.0,D,Africa,Sub-Saharan Africa,Middle Africa,AFR
3,ALB,Albania,9.0,B,Europe,Southern Europe,,EUR
4,DZA,Algeria,1.0,D,Africa,Northern Africa,,AFR


In [3]:
# dataset info
who_regions.info()

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 5, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 249 entries, 0 to 248
Data columns (total 8 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   ISO Codes                249 non-null    object 
 1   Country                  249 non-null    object 
 2   Region Number            192 non-null    float64
 3   Region Catrgory          192 non-null    object 
 4   all.region               247 non-null    object 
 5   all.sub-region           247 non-null    object 
 6   all.intermediate-region  105 non-null    object 
 7   WHO Regions              221 non-null    object 
dtypes: float64(1), object(7)
memory usage: 15.7+ KB


In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType


# Building Schema
who_region_schema = StructType([
    StructField("ISO Codes", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Region Number", IntegerType(), True),  # Corrected to IntegerType
    StructField("Region Catrgory", StringType(), True),
    StructField("all.region", StringType(), True),
    StructField("all.sub-region", StringType(), True),
    StructField("all.intermediate-region", StringType(), True),
    StructField("WHO Regions", StringType(), True)
])

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 6, Finished, Available, Finished)

In [5]:
# Loading into spark dataframe
who_region_spark = spark.createDataFrame(who_regions, schema= who_region_schema)

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 7, Finished, Available, Finished)

In [6]:
# Printing Schema
who_region_spark.printSchema()

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 8, Finished, Available, Finished)

root
 |-- ISO Codes: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Region Number: integer (nullable = true)
 |-- Region Catrgory: string (nullable = true)
 |-- all.region: string (nullable = true)
 |-- all.sub-region: string (nullable = true)
 |-- all.intermediate-region: string (nullable = true)
 |-- WHO Regions: string (nullable = true)



In [7]:
# Displaying data
display(who_region_spark)

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 9, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c470571e-d2ef-4feb-9040-a32d64e23b54)

In [11]:

# Rename columns to valid Delta-compatible names
who_region_spark_clean = who_region_spark.selectExpr(
    "`ISO Codes` as ISO_Codes",
    "`Country` as Country",
    "`Region Number` as Region_Number",
    "`Region Catrgory` as Region_Catrgory",
    "`all.region` as all_region",
    "`all.sub-region` as all_sub_region",
    "`all.intermediate-region` as all_intermediate_region",
    "`WHO Regions` as WHO_Regions"
)


# Loading into Lakehouse Table
who_region_spark_clean.write.format("delta").mode("overwrite").saveAsTable("Who_Regions")

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 13, Finished, Available, Finished)

In [12]:
# Extracting vaccine schedule data
Vaccine_Schedule = pd.read_excel(api_file_path, sheet_name= "Vaccine Schedule")
Vaccine_Schedule.head()

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 14, Finished, Available, Finished)

Unnamed: 0,ISO_3_CODE,COUNTRYNAME,YEAR,VACCINECODE,VACCINE_DESCRIPTION,SCHEDULEROUNDS,TARGETPOP,TARGETPOP_DESCRIPTION,GEOAREA,AGEADMINISTERED,SOURCECOMMENT,WHO Region
0,ABW,Aruba,2024,DTAPHIBIPV,DTaP-Hib-IPV (acellular) vaccine,1,,General/routine,NATIONAL,M2,,AMR
1,ABW,Aruba,2024,DTAPHIBIPV,DTaP-Hib-IPV (acellular) vaccine,2,,General/routine,NATIONAL,M4,,AMR
2,ABW,Aruba,2024,DTAPHIBIPV,DTaP-Hib-IPV (acellular) vaccine,3,,General/routine,NATIONAL,M6,,AMR
3,ABW,Aruba,2024,DTAPHIBIPV,DTaP-Hib-IPV (acellular) vaccine,4,B_2YL_W,General/routine,NATIONAL,M15,,AMR
4,ABW,Aruba,2024,DTAPIPV,DTaP-IPV (acellular) vaccine,5,B_CHILD_W,General/routine,NATIONAL,Y4,,AMR


In [13]:
# Df information
Vaccine_Schedule.info()

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 15, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9064 entries, 0 to 9063
Data columns (total 12 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   ISO_3_CODE             9064 non-null   object
 1   COUNTRYNAME            9064 non-null   object
 2   YEAR                   9064 non-null   int64 
 3   VACCINECODE            9064 non-null   object
 4   VACCINE_DESCRIPTION    9064 non-null   object
 5   SCHEDULEROUNDS         9064 non-null   int64 
 6   TARGETPOP              4550 non-null   object
 7   TARGETPOP_DESCRIPTION  9064 non-null   object
 8   GEOAREA                9038 non-null   object
 9   AGEADMINISTERED        7384 non-null   object
 10  SOURCECOMMENT          6156 non-null   object
 11  WHO Region             9064 non-null   object
dtypes: int64(2), object(10)
memory usage: 849.9+ KB


In [14]:

# Rename column with space to be Delta-compatible
Vaccine_Schedule.rename(columns={"WHO Region": "WHO_Region"}, inplace=True)


StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 16, Finished, Available, Finished)

In [15]:
#from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# defining Schema

vaccine_schema = StructType([
    StructField("ISO_3_CODE", StringType(), True),
    StructField("COUNTRYNAME", StringType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("VACCINECODE", StringType(), True),
    StructField("VACCINE_DESCRIPTION", StringType(), True),
    StructField("SCHEDULEROUNDS", IntegerType(), True),
    StructField("TARGETPOP", StringType(), True),
    StructField("TARGETPOP_DESCRIPTION", StringType(), True),
    StructField("GEOAREA", StringType(), True),
    StructField("AGEADMINISTERED", StringType(), True),
    StructField("SOURCECOMMENT", StringType(), True),
    StructField("WHO_Region", StringType(), True)  # Renamed to remove space
])

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 17, Finished, Available, Finished)

In [16]:
# Converting to spark df

vaccine_spark_df = spark.createDataFrame(Vaccine_Schedule, schema=vaccine_schema)

# Loading into Lakehouse
vaccine_spark_df.write.format("delta").mode("overwrite").saveAsTable("Vaccine_Schedule")

StatementMeta(, c3b9291e-a560-4110-8973-e67483722450, 18, Finished, Available, Finished)

In [2]:
import pandas as pd
# Extract from excel
Vaccine_Introduction = pd.read_excel(api_file_path, sheet_name= "Vaccine Introduction")
Vaccine_Introduction.head()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 4, Finished, Available, Finished)

Unnamed: 0,ISO_3_CODE,COUNTRYNAME,YEAR,DESCRIPTION,INTRO,WHO Region
0,AFG,Afghanistan,2024,aP (acellular pertussis) vaccine,No,EMR
1,AFG,Afghanistan,2024,Hepatitis A vaccine,No,EMR
2,AFG,Afghanistan,2024,Hepatitis B vaccine,Yes,EMR
3,AFG,Afghanistan,2024,HepB birth dose,Yes,EMR
4,AFG,Afghanistan,2024,Hib (Haemophilus influenzae type B) vaccine,Yes,EMR


In [4]:
# df info
Vaccine_Introduction.info()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 6, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 137945 entries, 0 to 137944
Data columns (total 6 columns):
 #   Column       Non-Null Count   Dtype 
---  ------       --------------   ----- 
 0   ISO_3_CODE   137945 non-null  object
 1   COUNTRYNAME  137945 non-null  object
 2   YEAR         137945 non-null  int64 
 3   DESCRIPTION  137945 non-null  object
 4   INTRO        137945 non-null  object
 5   WHO Region   137945 non-null  object
dtypes: int64(1), object(5)
memory usage: 6.3+ MB


In [5]:

# Rename columns in the pandas DataFrame
Vaccine_Introduction.columns = [col.replace(" ", "_") for col in Vaccine_Introduction.columns]


StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 7, Finished, Available, Finished)

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Defining Schema for spark df
Vaccine_Introduction_Schema = StructType([
    StructField("ISO_3_CODE", StringType(), True),
    StructField("COUNTRYNAME", StringType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("DESCRIPTION", StringType(), True),
    StructField("INTRO", StringType(), True),
    StructField("WHO_Region", StringType(), True)
])

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 8, Finished, Available, Finished)

In [7]:
# Loading into spark df
Vaccine_Introduction_Spark = spark.createDataFrame(Vaccine_Introduction, schema= Vaccine_Introduction_Schema)
# Printing Schema
Vaccine_Introduction_Spark.printSchema()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 9, Finished, Available, Finished)

root
 |-- ISO_3_CODE: string (nullable = true)
 |-- COUNTRYNAME: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- INTRO: string (nullable = true)
 |-- WHO_Region: string (nullable = true)



In [8]:
# Loading to Lakehouse Table
Vaccine_Introduction_Spark.write.format("delta").mode("overwrite").saveAsTable("Vaccine_Introduction")

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 10, Finished, Available, Finished)

In [9]:
# Extracting reported cases
Reported_Cases = pd.read_excel(api_file_path, sheet_name= "Reported Cases")
Reported_Cases.head()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 11, Finished, Available, Finished)

Unnamed: 0,GROUP,CODE,NAME,YEAR,DISEASE,DISEASE_DESCRIPTION,CASES
0,COUNTRIES,ABW,Aruba,2024,CRS,Congenital rubella syndrome,0.0
1,COUNTRIES,ABW,Aruba,2024,DIPHTHERIA,Diphtheria,0.0
2,COUNTRIES,ABW,Aruba,2024,INVASIVE_MENING,Invasive meningococcal disease,0.0
3,COUNTRIES,ABW,Aruba,2024,MEASLES,Measles,0.0
4,COUNTRIES,ABW,Aruba,2024,MUMPS,Mumps,2.0


In [10]:
# df info
Reported_Cases.info()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 12, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 87641 entries, 0 to 87640
Data columns (total 7 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   GROUP                87641 non-null  object 
 1   CODE                 87641 non-null  object 
 2   NAME                 87641 non-null  object 
 3   YEAR                 87641 non-null  int64  
 4   DISEASE              87641 non-null  object 
 5   DISEASE_DESCRIPTION  87641 non-null  object 
 6   CASES                67637 non-null  float64
dtypes: float64(1), int64(1), object(5)
memory usage: 4.7+ MB


In [11]:
# defining schema
Reported_Cases_Schema = StructType([
    StructField("GROUP", StringType(), True),
    StructField("CODE", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("DISEASE", StringType(), True),
    StructField("DISEASE_DESCRIPTION", StringType(), True),
    StructField("CASES", IntegerType(), True)
])

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 13, Finished, Available, Finished)

In [12]:
# Loading into spark
Reported_Cases_Spark = spark.createDataFrame(Reported_Cases, schema= Reported_Cases_Schema)
# Printing Schema
Reported_Cases_Spark.printSchema()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 14, Finished, Available, Finished)

root
 |-- GROUP: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- DISEASE: string (nullable = true)
 |-- DISEASE_DESCRIPTION: string (nullable = true)
 |-- CASES: integer (nullable = true)



In [13]:
# Loading to lakehouse
Reported_Cases_Spark.write.format("delta").mode("overwrite").saveAsTable("Reported_Cases")

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 15, Finished, Available, Finished)

In [14]:
# Extracting from excel
Other_Indicators = pd.read_excel(api_file_path, sheet_name= "Other Indicators")
Other_Indicators.head()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 16, Finished, Available, Finished)

Unnamed: 0,ISO_3_CODE,COUNTRYNAME,YEAR,INDCODE,DESCRIPTION,INDCAT_DESCRIPTION,INDSORT,VALUE,WHO Region
0,ABW,Aruba,2024,HBR_AVAILABLE_HBR_AVC,Are adult vaccination card home-based records ...,Home-based records,1809,Yes,AMR
1,ABW,Aruba,2024,SCHOOL_DELIVERED,Are any routine doses of vaccines on the natio...,School vaccination,508,Yes,AMR
2,ABW,Aruba,2024,INTEGRATION_YES_ANSWER_INTEGRATION_CATCHUP,Are catch-up of other vaccines in other popula...,Integrated services,2034,No,AMR
3,ABW,Aruba,2024,HBR_AVAILABLE_HBR_CEVPC,Are child expanded vaccination-plus card home-...,Home-based records,1811,No,AMR
4,ABW,Aruba,2024,HBR_AVAILABLE_HBR_CHB,Are child health booklet home-based records cu...,Home-based records,1813,Yes,AMR


In [15]:
# df info
Other_Indicators.info()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 17, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 937833 entries, 0 to 937832
Data columns (total 9 columns):
 #   Column              Non-Null Count   Dtype 
---  ------              --------------   ----- 
 0   ISO_3_CODE          937833 non-null  object
 1   COUNTRYNAME         937833 non-null  object
 2   YEAR                937833 non-null  int64 
 3   INDCODE             937833 non-null  object
 4   DESCRIPTION         937833 non-null  object
 5   INDCAT_DESCRIPTION  937833 non-null  object
 6   INDSORT             937833 non-null  int64 
 7   VALUE               632596 non-null  object
 8   WHO Region          937833 non-null  object
dtypes: int64(2), object(7)
memory usage: 64.4+ MB


In [16]:

# Rename columns with spaces
Other_Indicators.columns = [col.replace(" ", "_") for col in Other_Indicators.columns]


StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 18, Finished, Available, Finished)

In [17]:
# writing schema
Other_Indicators_schema = StructType([
    StructField("ISO_3_CODE", StringType(), True),
    StructField("COUNTRYNAME", StringType(), True),
    StructField("YEAR", IntegerType(), True),
    StructField("INDCODE", StringType(), True),
    StructField("DESCRIPTION", StringType(), True),
    StructField("INDCAT_DESCRIPTION", StringType(), True),
    StructField("INDSORT", IntegerType(), True),
    StructField("VALUE", StringType(), True),
    StructField("WHO_Region", StringType(), True)
])


StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 19, Finished, Available, Finished)

In [18]:
# loading to spark df
Other_Indicators_spark = spark.createDataFrame(Other_Indicators, schema= Other_Indicators_schema)
# Printing Schema
Other_Indicators_spark.printSchema()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 20, Finished, Available, Finished)

root
 |-- ISO_3_CODE: string (nullable = true)
 |-- COUNTRYNAME: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- INDCODE: string (nullable = true)
 |-- DESCRIPTION: string (nullable = true)
 |-- INDCAT_DESCRIPTION: string (nullable = true)
 |-- INDSORT: integer (nullable = true)
 |-- VALUE: string (nullable = true)
 |-- WHO_Region: string (nullable = true)



In [19]:
# Loading into Lakehouse
Other_Indicators_spark.write.format("delta").mode("overwrite").saveAsTable("Other_Indicators")

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 21, Finished, Available, Finished)

In [20]:
# extracting from excel
Incidence_Rate = pd.read_excel(api_file_path, sheet_name="Incidence Rate")
Incidence_Rate.head()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 22, Finished, Available, Finished)

Unnamed: 0,GROUP,CODE,NAME,YEAR,DISEASE,DISEASE_DESCRIPTION,DENOMINATOR,INCIDENCE_RATE
0,COUNTRIES,ABW,Aruba,2024,CRS,Congenital rubella syndrome,"per 10,000 live births",0.0
1,COUNTRIES,ABW,Aruba,2024,DIPHTHERIA,Diphtheria,"per 1,000,000 total population",0.0
2,COUNTRIES,ABW,Aruba,2024,INVASIVE_MENING,Invasive meningococcal disease,"per 1,000,000 total population",0.0
3,COUNTRIES,ABW,Aruba,2024,MEASLES,Measles,"per 1,000,000 total population",0.0
4,COUNTRIES,ABW,Aruba,2024,MUMPS,Mumps,"per 1,000,000 total population",18.5


In [21]:
# df info
Incidence_Rate.info()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 23, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 87719 entries, 0 to 87718
Data columns (total 8 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   GROUP                87719 non-null  object 
 1   CODE                 87719 non-null  object 
 2   NAME                 87719 non-null  object 
 3   YEAR                 87719 non-null  int64  
 4   DISEASE              87719 non-null  object 
 5   DISEASE_DESCRIPTION  87719 non-null  object 
 6   DENOMINATOR          87719 non-null  object 
 7   INCIDENCE_RATE       63715 non-null  float64
dtypes: float64(1), int64(1), object(6)
memory usage: 5.4+ MB


In [22]:
from pyspark.sql.types import DoubleType
# defining schema

incidence_rate_schema = StructType([
    StructField("GROUP",                StringType(), True),   # object
    StructField("CODE",                 StringType(), True),   # object
    StructField("NAME",                 StringType(), True),   # object
    StructField("YEAR",                 IntegerType(),   True),   # int64 -> LongType
    StructField("DISEASE",              StringType(), True),   # object
    StructField("DISEASE_DESCRIPTION",  StringType(), True),   # object
    StructField("DENOMINATOR",          StringType(), True),   # object
    StructField("INCIDENCE_RATE",       DoubleType(), True),   # float64 -> DoubleType
])


StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 24, Finished, Available, Finished)

In [23]:
# Loading into spark df
Incidence_Rate_Spark = spark.createDataFrame(Incidence_Rate, schema= incidence_rate_schema)

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 25, Finished, Available, Finished)

In [24]:
# Printing Schema
Incidence_Rate_Spark.printSchema()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 26, Finished, Available, Finished)

root
 |-- GROUP: string (nullable = true)
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- DISEASE: string (nullable = true)
 |-- DISEASE_DESCRIPTION: string (nullable = true)
 |-- DENOMINATOR: string (nullable = true)
 |-- INCIDENCE_RATE: double (nullable = true)



In [25]:
# Loading into Lakehouse
Incidence_Rate_Spark.write.format("delta").mode("overwrite").saveAsTable("Incidence_Rate")

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 27, Finished, Available, Finished)

In [27]:
# Extract form excel
Coverage = pd.read_excel(api_file_path, sheet_name= "Coverage")
Coverage.head()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 29, Finished, Available, Finished)

Unnamed: 0,CODE,NAME,YEAR,ANTIGEN,ANTIGEN_DESCRIPTION,COVERAGE_CATEGORY,COVERAGE_CATEGORY_DESCRIPTION,TARGET_NUMBER,DOSES,COVERAGE
0,ABW,Aruba,2024,BCG,BCG,ADMIN,Administrative coverage,,,
1,ABW,Aruba,2024,BCG,BCG,OFFICIAL,Official coverage,,,
2,ABW,Aruba,2024,COVID19_CHRONIC_ADULT,COVID-19 vaccine - adults with chronic conditions,ADMIN,Administrative coverage,,,
3,ABW,Aruba,2024,COVID19_FEM,COVID-19 vaccine - female,ADMIN,Administrative coverage,,,
4,ABW,Aruba,2024,COVID19_HW,COVID-19 vaccine - health and care workers,ADMIN,Administrative coverage,,,


In [28]:
# Coverage df info
Coverage.info()

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 30, Finished, Available, Finished)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 416327 entries, 0 to 416326
Data columns (total 10 columns):
 #   Column                         Non-Null Count   Dtype  
---  ------                         --------------   -----  
 0   CODE                           416327 non-null  object 
 1   NAME                           415033 non-null  object 
 2   YEAR                           416327 non-null  int64  
 3   ANTIGEN                        416327 non-null  object 
 4   ANTIGEN_DESCRIPTION            416327 non-null  object 
 5   COVERAGE_CATEGORY              416327 non-null  object 
 6   COVERAGE_CATEGORY_DESCRIPTION  416327 non-null  object 
 7   TARGET_NUMBER                  81376 non-null   float64
 8   DOSES                          82059 non-null   float64
 9   COVERAGE                       240036 non-null  float64
dtypes: float64(3), int64(1), object(6)
memory usage: 31.8+ MB


In [None]:

Coverage["TARGET_NUMBER"] = (
    pd.to_numeric(Coverage["TARGET_NUMBER"], errors="coerce")
      .astype("Int64")  # nullable integer, keeps NaN as <NA>
)

Coverage["DOSES"] = (
    pd.to_numeric(Coverage["DOSES"], errors="coerce")
      .astype("Int64")
)


In [29]:
# writing schema
coverage_schema = StructType([
    StructField("CODE",                           StringType(), True),
    StructField("NAME",                           StringType(), True),
    StructField("YEAR",                           IntegerType(),   True),   # pandas int64 -> Spark LongType
    StructField("ANTIGEN",                        StringType(), True),
    StructField("ANTIGEN_DESCRIPTION",            StringType(), True),
    StructField("COVERAGE_CATEGORY",              StringType(), True),
    StructField("COVERAGE_CATEGORY_DESCRIPTION",  StringType(), True),
    StructField("TARGET_NUMBER",                  IntegerType(),   True),   # casted to int on pandas side
    StructField("DOSES",                          IntegerType(),   True),   # casted to int on pandas side
    StructField("COVERAGE",                       DoubleType(), True),   # pandas float64 -> Spark DoubleType
])

# create Spark DataFrame from pandas Coverage using the schema

spark_coverage_df = spark.createDataFrame(Coverage, schema=coverage_schema)
spark_coverage_df.printSchema()


StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 31, Finished, Available, Finished)

root
 |-- CODE: string (nullable = true)
 |-- NAME: string (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- ANTIGEN: string (nullable = true)
 |-- ANTIGEN_DESCRIPTION: string (nullable = true)
 |-- COVERAGE_CATEGORY: string (nullable = true)
 |-- COVERAGE_CATEGORY_DESCRIPTION: string (nullable = true)
 |-- TARGET_NUMBER: integer (nullable = true)
 |-- DOSES: integer (nullable = true)
 |-- COVERAGE: double (nullable = true)



In [30]:
# Loading into Lakehouse
spark_coverage_df.write.format("delta").mode("overwrite").saveAsTable("Coverage")

StatementMeta(, 8abd50ac-c267-4e27-b951-9d7d420e3644, 32, Finished, Available, Finished)