In [None]:
# This cell can be used for testing or pip install. To remove at final version

%pip install unidecode
#from unidecode import unidecode
#print(unidecode("café"))

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [None]:
# Keep cell empty to use as start-run cell

In [None]:
# Run this cell to import libraries, set up URL etc

import pandas as pd
import numpy as np
from decimal import Decimal
from unidecode import unidecode
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("WriteToSQLServer").config("spark.sql.execution.arrow.pyspark.enabled", "false").getOrCreate() 

# Database connection string (update with your actual credentials)
db_username = 'USER'  # Replace with your PostgreSQL username
db_password = 'PASSWORD'  # Replace with your PostgreSQL password
db_host = 'HOST.postgres.database.azure.com'        # Replace with your host if not local
db_port = 5432               # Default PostgreSQL port
db_name = 'DATABASE'    # Replace with your database name

db_url = f'jdbc:postgresql://{db_host}:{db_port}/{db_name}?user={db_username}&password={db_password}&sslmode=require'

# Azure Blob storage access setup for writing CSV
storage_account_name = "<Your Storage Account Name>" 
container_name = "<Your Container Name>" 
storage_account_access_key = "<Your Access Key>"

spark.conf.set(
  f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_access_key
)

# Azure storage URL and folder where olist dataset is stored 
folder_name = "olist"
azurl = f"https://{storage_account_name}.blob.core.windows.net/{container_name}/{folder_name}/"

# Folder to store cleaned CSV
cleaned_folder = "olist/cleaned/"

1. Data Cleaning for "Customers"
- removed duplicates
- strip leading/trailing whitespace
- handling null values
- convert columns to appropriate data types
- ensuring same character length

Issues
- should duplicated 'customer_unique_id' be removed?
- to convert column data type to 'category', the null values have to filled with 'N/A' first before column data type conversion

In [None]:
# Load the dataset
file_name = 'olist_customers_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())
    
    # Step 2: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')
    
    # Step 3: Convert columns to appropriate data types
    df['customer_id'] = df['customer_id'].astype('string')
    df['customer_unique_id'] = df['customer_unique_id'].astype('string')
    df['customer_zip_code_prefix'] = df['customer_zip_code_prefix'].astype('string')
    df['customer_city'] = df['customer_city'].astype('category') 
    df['customer_state'] = df['customer_state'].astype('category')
    
    # Step 4: Normalize `customer_zip_code_prefix` to ensure all are 5 characters
    df['customer_zip_code_prefix'] = df['customer_zip_code_prefix'].str.zfill(5)

    # Step 5: Drop duplicates based on the specified subset of columns
    df = df.drop_duplicates(subset=['customer_unique_id', 'customer_zip_code_prefix'])

    # Step 6: Drop customer id column  based on the specified subset of columns
    df = df.drop(columns="customer_id")
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
customers = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_customers_dataset.csv"
customers.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'customers'
customers.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")


Cleaned dataset written to customers table


2. Data Cleaning for "geolocation"
- removed duplicates, keeping the first occurence
- strip leading/trailing whitespace
- handling null values
- convert columns to appropriate data types

Issues
- should category be used?

In [None]:
# Load the dataset
file_name = 'olist_geolocation_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Remove duplicates in `geolocation_zip_code_prefix`, keeping the first occurrence
    df['geolocation_city'] = df['geolocation_city'].apply(lambda x: unidecode(x))
    df = df.drop_duplicates(subset='geolocation_zip_code_prefix')
    
    # Step 2: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 3: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')
    
    # Step 4: Convert columns to appropriate data types
    df['geolocation_zip_code_prefix'] = df['geolocation_zip_code_prefix'].astype('string')
    df['geolocation_lat'] = df['geolocation_lat'].astype('float64')
    df['geolocation_lng'] = df['geolocation_lng'].astype('float64')
    df['geolocation_city'] = df['geolocation_city'].astype('category')
    df['geolocation_state'] = df['geolocation_state'].astype('category')

    # Step 5: Normalize `geolocation_zip_code_prefix` to ensure all are 5 characters
    df['geolocation_zip_code_prefix'] = df['geolocation_zip_code_prefix'].str.zfill(5)
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
geolocation = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_geolocation_dataset.csv"
geolocation.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'geolocation'
geolocation.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")


#----------------------------------#
# Extract city names to City table

# Step 1: Extract distinct city_name and state_code. cleaned_data here is referring to geolocation
city = cleaned_data[['geolocation_city', 'geolocation_state']].drop_duplicates().rename(columns={
    'geolocation_city': 'city_name',
    'geolocation_state': 'state_code'
}).reset_index(drop=True)

# Step 2: insert auto-increment city_id
city.insert(0, 'city_id', range(1, len(city) + 1))

# Step 3: Join geolocation and city DataFrames on the city column
merged_df = pd.merge(cleaned_data, city, left_on=['geolocation_city','geolocation_state'], right_on=['city_name','state_code']).rename(columns={
    'geolocation_lat': 'latitude',
    'geolocation_lng': 'longitude'
})

# Step 4: Drop zipcode and duplicated columns
merged_df.drop(columns=['geolocation_zip_code_prefix','geolocation_city','geolocation_state'], inplace=True)

# Step 5: Compute the average latitude and longitude, grouped by city_id
get_lat_lng = merged_df.groupby(['city_id'], observed=False).agg({
    'latitude': 'mean',
    'longitude': 'mean'
})

# Step 6: Merge the city with its lat/lng
city = pd.merge(city, get_lat_lng, on='city_id').reset_index(drop=True)

# Step 7: Correct wrong latitude/longtitude of city (location in Portugal/Spain) to Brazil
city.loc[city['city_name'] == 'porto trombetas', ['latitude','longitude']] = [-1.743514558,-52.24416336]
city.loc[city['city_name'] == 'santa lucia do piai', ['latitude','longitude']]  = [-29.241292800, -51.021271670]
city.loc[city['city_name'] == 'bom retiro da esperanca', ['latitude','longitude']] = [-23.520184363, -48.286817029]
city.loc[city['city_name'] == 'areia branca dos assis', ['latitude','longitude']] = [-25.867626304, -49.368047063]
city.loc[city['city_name'] == 'ilha dos valadares', ['latitude','longitude']] = [-25.533502571, -48.508189284]
#city.loc[city['city_name'] == 'vila nova de campos', ['latitude','longitude']] = [-24.57678608	-53.79553808] # don't know why not working
city.loc[city['city_id'] == 817, ['latitude','longitude']] = [-25.533502571, -48.508189284]

# Convert Pandas DataFrame to Spark DataFrame
city = spark.createDataFrame(city)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_city_dataset.csv"
city.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'city'
city.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to geolocation table
Cleaned dataset written to city table


3. Data Cleaning for "order_items"
- convert columns to appropriate data types
- removed duplicates
- strip leading/trailing whitespace
- handling null values

Issues
- The below code did not work, so changed to allow pandas to automatically infer the format by not specifying the format parameter
    - df['shipping_limit_date'] = pd.to_datetime(df['shipping_limit_date'], format='%d/%m/%Y %I:%M:%S %p')
    - df['shipping_limit_date'] = pd.to_datetime(df['shipping_limit_date'])

In [None]:
# Load the dataset
file_name = 'olist_order_items_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):    
    # Step 1: Remove duplicates in `order_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='order_id')
    
    # Step 2: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 3: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')

    # Step 4: Convert columns to appropriate data types
    df['order_id'] = df['order_id'].astype('string')
    df['order_item_id'] = df['order_item_id'].astype('int64')
    df['product_id'] = df['product_id'].astype('string')
    df['seller_id'] = df['seller_id'].astype('string')
    df['shipping_limit_date'] = pd.to_datetime(df['shipping_limit_date'])
    df['price'] = df['price'].apply(Decimal)
    df['freight_value'] = df['freight_value'].apply(Decimal)
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
order_items = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_order_items_dataset.csv"
order_items.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'order_items'
order_items.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to order_items table


4. Data Cleaning for "order_payments"
- removed duplicates
- strip leading/trailing whitespace
- handling null values
- convert columns to appropriate data types

In [None]:
# Load the dataset
file_name = 'olist_order_payments_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Remove duplicates in `order_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='order_id')
    
    # Step 2: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 3: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')
    
    # Step 4: Convert columns to appropriate data types
    df['order_id'] = df['order_id'].astype('string')
    df['payment_sequential'] = df['payment_sequential'].astype('int8')
    df['payment_type'] = df['payment_type'].astype('category')
    df['payment_installments'] = df['payment_installments'].astype('int8')
    df['payment_value'] = df['payment_value'].apply(Decimal)
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
order_payments = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_order_payments_dataset.csv"
order_payments.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'order_payments'
order_payments.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to order_payments table


5. Data Cleaning for "order_reviews"
- removed duplicates
- strip leading/trailing whitespace
- handling null values
- convert columns to appropriate data types

Issues 
- retained review_score as 'int64' instead oa 'category' in case we would like to do mathematical operations such as applying some statistical analysis

In [None]:
# Load the dataset
file_name = 'olist_order_reviews_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Remove duplicates in `order_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='order_id')
    
    # Step 2: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 3: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')

    # Step 4: Convert columns to appropriate data types
    df['review_id'] = df['review_id'].astype('string')
    df['order_id'] = df['order_id'].astype('string')
    df['review_score'] = df['review_score'].astype('int64')
    df['review_comment_title'] = df['review_comment_title'].astype('string')
    df['review_comment_message'] = df['review_comment_message'].astype('string')
    df['review_creation_date'] = pd.to_datetime(df['review_creation_date'])
    df['review_answer_timestamp'] = pd.to_datetime(df['review_answer_timestamp']).dt.date
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
order_reviews = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_order_reviews_dataset.csv"
order_reviews.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'order_reviews'
order_reviews.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to order_reviews table


6. Data Cleaning for "orders"
- convert columns to appropriate data types
- removed duplicates
- strip leading/trailing whitespace
- handling null values

Issues 
- if set to_datetime first, .fillna later is ok, but if .fillna first, to_datetime later is not ok
- however if set to 'category' first, .fillna later is not ok
- hence the solution here is to set as 'string' first, then .fillna, then set as 'category'

In [None]:
# Load the dataset
file_name = 'olist_orders_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Convert columns to appropriate data types
    df['order_id'] = df['order_id'].astype('string')
    df['customer_id'] = df['customer_id'].astype('string')
    df['order_status'] = df['order_status'].astype('string')
    df['order_purchase_timestamp'] = pd.to_datetime(df['order_purchase_timestamp']).dt.floor('h')
    df['order_approved_at'] = pd.to_datetime(df['order_approved_at']).dt.floor('h')
    df['order_delivered_carrier_date'] = pd.to_datetime(df['order_delivered_carrier_date']).dt.floor('h')
    df['order_delivered_customer_date'] = pd.to_datetime(df['order_delivered_customer_date']).dt.floor('h')
    df['order_estimated_delivery_date'] = pd.to_datetime(df['order_estimated_delivery_date'])

    # Step 2: Remove duplicates in `order_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='order_id')
    
    # Step 3: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 4: Replace NULL values with the string "N/A":
    ######################################df = df.fillna('N/A')

    # Step 5: Convert 'order_status' to category
    df['order_status'] = df['order_status'].astype('category')

    # Step 6: Add customer_unique_id column to orders dataset, then drop the customer_id column
    customers = pd.read_csv(azurl + 'olist_customers_dataset.csv')
    df = df.merge(customers[['customer_id', 'customer_unique_id']], left_on='customer_id', right_on='customer_id', how='left')
    df = df.drop(columns=['customer_id'])
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
orders = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_orders_dataset.csv"
orders.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'orders'
orders.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to orders table


7. Data Cleaning for "products"
- removed duplicates
- strip leading/trailing whitespace
- handling null values in string columns
- handling NaN or inf in integer columns
- convert columns to appropriate data types

Issues
- issue is that .fillna has to come before .astype('category'), and some columns had null values which cannot be set .astype('int64')
- hence, step 3 & 4 separated into str columns and int columns, for filling with 'N/A' or '0' respectively
- subsequently, step 5 conversion into 'string', 'category' and 'int64' data types

In [None]:
# Load the dataset
file_name = 'olist_products_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df): 
    # Step 1: Translate Portugese category names to English
    products_path = azurl + 'olist_products_dataset.csv'
    translation_path = azurl + 'product_category_name_translation.csv'
    products_df = pd.read_csv(products_path)
    translation_df = pd.read_csv(translation_path)
    
    merged_df = products_df.merge(translation_df, on='product_category_name', how='left')
    merged_df.insert(1, 'product_category_name_english_merged', merged_df['product_category_name_english'])
    merged_dropped_df = merged_df.drop(['product_category_name', 'product_category_name_english'], axis=1)  
    df = merged_dropped_df
    
    # Step 2: Remove duplicates in `product_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='product_id')
    
    # Step 3: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 4: Replace NULL values with the string "N/A" in the string columns:
    str_columns = ['product_id', 'product_category_name_english_merged']
    df[str_columns] = df[str_columns].fillna('N/A')

    # Step 5: Replace NaN or inf with 0 in the integer columns:
    int_columns = ['product_name_lenght', 'product_description_lenght', 'product_photos_qty',
                   'product_weight_g', 'product_length_cm', 'product_height_cm', 'product_width_cm']
    df[int_columns] = df[int_columns].replace([np.nan, np.inf, -np.inf], 0)

    # Step 6: Convert columns to appropriate data types
    df['product_id'] = df['product_id'].astype('string')
    df['product_category_name_english_merged'] = df['product_category_name_english_merged'].astype('category')
    df['product_name_lenght'] = df['product_name_lenght'].astype('int64')  
    df['product_description_lenght'] = df['product_description_lenght'].astype('int64')  
    df['product_photos_qty'] = df['product_photos_qty'].astype('int64')  
    df['product_weight_g'] = df['product_weight_g'].astype('int64') 
    df['product_length_cm'] = df['product_length_cm'].astype('int64') 
    df['product_height_cm'] = df['product_height_cm'].astype('int64') 
    df['product_width_cm'] = df['product_width_cm'].astype('int64')

    # Step 7: Rename columns 'product_name_lenght' and 'product_description_lenght' to the correct spelling of 'length'
    df = df.rename(columns={'product_category_name_english_merged':'product_category','product_name_lenght': 'product_name_length', 'product_description_lenght' : 'product_description_length'})
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
products = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_products_dataset.csv"
products.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'products'
products.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to products table


8. Data Cleaning for "sellers"
- removed duplicates
- strip leading/trailing whitespace
- handling null values
- convert columns to appropriate data types

In [None]:
# Load the dataset
file_name = 'olist_sellers_dataset.csv'
file = azurl + file_name
data = pd.read_csv(file)

# Clean the data
def clean_data(df):
    # Step 1: Remove duplicates in `product_id` if any, keeping the first occurrence
    df = df.drop_duplicates(subset='seller_id')
    
    # Step 2: Strip leading/trailing whitespace from all string columns
    str_columns = df.select_dtypes(include='string').columns
    df[str_columns] = df[str_columns].apply(lambda col: col.str.strip())

    # Step 3: Replace NULL values with the string "N/A":
    df = df.fillna('N/A')

    # Step 4: Convert columns to appropriate data types
    df['seller_id'] = df['seller_id'].astype('string')
    df['seller_zip_code_prefix'] = df['seller_zip_code_prefix'].astype('string')
    df['seller_city'] = df['seller_city'].astype('category') 
    df['seller_state'] = df['seller_state'].astype('category')

    # Step 5: Normalize `customer_zip_code_prefix` to ensure all are 5 characters
    df['seller_zip_code_prefix'] = df['seller_zip_code_prefix'].str.zfill(5)
    
    # Return the cleaned dataframe
    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
sellers = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "cleaned_olist_sellers_dataset.csv"
sellers.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'sellers'
sellers.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to sellers table


In [None]:
# State names look up table
# Data from other Kaggler dataset

# Load the dataset
file_name = 'misc/states.csv'
file = azurl + file_name
data = pd.read_csv(file)


def clean_data(df):

    df = df[['UF', 'State']]  # Keep state codes and names only
    df = df.rename(columns={'UF': 'code', 'State' : 'state'})  # Rename UF and State to code and state respectively
    
    # Normalise accented strings to normal alphabet strings
    df['state'] = df['state'].apply(lambda x: unidecode(x))
    
    # Copy converted proper name to title case 
    df['state'] = df['state'].str.title()

    return df

# Clean the dataset
cleaned_data = clean_data(data)

# Convert Pandas DataFrame to Spark DataFrame
states = spark.createDataFrame(cleaned_data)

# Write cleaned CSV to Blob Storage cleaned folder
cleaned_file = "states_lookup.csv"
states.write.format("csv").mode("overwrite").save(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{cleaned_folder}{cleaned_file}")

# Write to Azure Postgres
table = 'states'
states.write.format("jdbc").option("url", db_url).option("dbtable", table).mode('overwrite').save()
print(f"Cleaned dataset written to {table} table")

Cleaned dataset written to states table
