In [1]:
import pandas as pd
import numpy as np
import yaml
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

# Connect to Snowflake

In [2]:
with open("/Users/irene/CloudStation/Portfolio/snowflake-data-load/config/snowflake_key.yaml", "r") as file:
    account_data = yaml.safe_load(file)

In [53]:
conn = snowflake.connector.connect(
    user=account_data["user"],
    password=account_data["password"],
    account=account_data["account"],
    warehouse=account_data["warehouse"],
    database=account_data["database"],
    schema=account_data["schema"]
    )

In [54]:
conn.cursor().execute("USE WAREHOUSE insurance_wh")
conn.cursor().execute("USE DATABASE insurance_db")
conn.cursor().execute("USE SCHEMA insurance_db.insurance_schema")

<snowflake.connector.cursor.SnowflakeCursor at 0x118d82d50>

# Load data

In [6]:
path = "/Users/irene/CloudStation/Portfolio/snowflake-data-load/task-1/raw-data/Analytics Engineer Data Load.xlsx"
df = pd.read_excel(path, sheet_name="Sheet1")

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 25 entries, 0 to 24
Data columns (total 6 columns):
 #   Column       Non-Null Count  Dtype         
---  ------       --------------  -----         
 0   Name         25 non-null     object        
 1   ClaimNumber  25 non-null     object        
 2   Date         25 non-null     datetime64[ns]
 3   Amount       25 non-null     float64       
 4   ID           25 non-null     int64         
 5   Hidden       0 non-null      float64       
dtypes: datetime64[ns](1), float64(2), int64(1), object(2)
memory usage: 1.3+ KB


# Load raw data to INSURANCE_RAW

In [7]:
# drop column
df.drop(columns=['Hidden'], inplace=True)

# rename column
new_column_names = {
    'Name': 'CUSTOMER_NAME',
    'ClaimNumber': 'CLAIM_NUMBER',
    'Date': 'CLAIM_DATE',
    'Amount': 'CLAIM_AMOUNT',
    'ID': 'CUSTOMER_ID'
}

df.rename(columns=new_column_names, inplace=True)

# fix data type 
df['CLAIM_NUMBER'] = df['CLAIM_NUMBER'].astype(str)
df['CUSTOMER_NAME'] = df['CUSTOMER_NAME'].astype(str)
df['CUSTOMER_ID'] = df['CUSTOMER_ID'].astype(str)
df["CLAIM_AMOUNT"] = df["CLAIM_AMOUNT"].astype(float)
df["CLAIM_DATE"] = df["CLAIM_DATE"].dt.strftime('%Y-%m-%d')

## Create table

In [8]:
# define the create table query based on your DataFrame columns
create_table_query = """
CREATE TABLE IF NOT EXISTS INSURANCE_RAW (
    customer_name STRING,
    claim_number STRING,
    claim_date DATE,
    claim_amount FLOAT,
    customer_id STRING
)
"""

# Execute the query
cursor = conn.cursor()
cursor.execute(create_table_query)

True

## Upload

In [9]:
# upload raw data to Snowflake
success, nchunks, nrows, _ = write_pandas(conn, df, 'INSURANCE_RAW')

if success:
    print(f"Successfully loaded {nrows} rows into Snowflake table.")
else:
    print("Failed to load data into Snowflake.")

Successfully loaded 25 rows into Snowflake table.


# Transformation

## Load raw data from INSURANCE_RAW

In [91]:
sql_query = """
SELECT * FROM INSURANCE_RAW
"""

cursor = conn.cursor()
cursor.execute(sql_query)
df_raw = cursor.fetch_pandas_all()

## Cleaning

In [75]:
# drop duplicates across specific columns
def drop_duplicates(df, columns):
    """
    Drop duplicates in a DataFrame based on specified columns.
    
    Parameters:
        df (pandas.DataFrame): Input DataFrame.
        columns (list): List of column names to check for duplicates.
        
    Returns:
        pandas.DataFrame: DataFrame with duplicates dropped.
    """
    # Check for duplicates across specified columns
    duplicates = df.duplicated(subset=columns, keep=False)
    
    # Drop duplicates if they exist
    if duplicates.any():
        df.drop_duplicates(subset=columns, keep='first', inplace=True)
    
    return df

In [92]:
columns_to_check = ['CUSTOMER_NAME', 'CLAIM_NUMBER', 'CLAIM_DATE', 'CLAIM_AMOUNT']

df_cleaned = drop_duplicates(df_raw, columns_to_check)

In [93]:
# remove non-English characters
df_cleaned['CUSTOMER_NAME'] = df_cleaned['CUSTOMER_NAME'].str.replace(r'[^\x00-\x7F]+', '', regex=True)

# remove non-numeric characters 
df_cleaned['CLAIM_NUMBER'] = df_cleaned['CLAIM_NUMBER'].str.replace(r'\D', '', regex=True)

# remove commas and special symbols
df_cleaned['CUSTOMER_NAME'] = df_cleaned['CUSTOMER_NAME'].str.replace(r'[^\w\s]+', '', regex=True)

# # Replace any spaces with single spaces
df_cleaned['CUSTOMER_NAME'] = df_cleaned['CUSTOMER_NAME'].str.replace(r'\s+', ' ', regex=True)


In [94]:
df_cleaned

Unnamed: 0,CUSTOMER_NAME,CLAIM_NUMBER,CLAIM_DATE,CLAIM_AMOUNT,CUSTOMER_ID
0,James Bond,409853945,1984-01-01,45.0,1
1,Peter,5345435435,2021-01-02,546.0,2
2,Steve,543645346,1984-01-03,556.0,3
3,Bill,7756,1987-01-04,764.0,4
4,Scott,40845954,1984-01-05,565.0,5
5,Ben,55663,1989-01-06,776.0,6
6,Jay,45435435,1984-01-07,5667.0,7
7,Smith,453464,2016-12-30,56456.0,8
8,Dr Jacob,4366563674,2016-12-31,34324.0,9
9,Will Smith,3465764,2017-01-01,56546.0,10


## Load df_cleaned to INSURANCE_CLEANED

### Create table

In [96]:
# define the create table query based on your DataFrame columns
create_table_query = """
CREATE TABLE IF NOT EXISTS INSURANCE_CLEANED (
    customer_name STRING,
    claim_number STRING,
    claim_date DATE,
    claim_amount FLOAT,
    customer_id STRING
)
"""

# Execute the query
cursor = conn.cursor()
cursor.execute(create_table_query)

<snowflake.connector.cursor.SnowflakeCursor at 0x11bc75b10>

### Upload

In [97]:
# upload raw data to Snowflake
success, nchunks, nrows, _ = write_pandas(conn, df_cleaned, 'INSURANCE_CLEANED')

if success:
    print(f"Successfully loaded {nrows} rows into Snowflake table.")
else:
    print("Failed to load data into Snowflake.")

  success, nchunks, nrows, _ = write_pandas(conn, df_cleaned, 'INSURANCE_CLEANED')


Successfully loaded 23 rows into Snowflake table.
