# Milestone 2: Extract and clean the data from the data sources.
##### Your first mission will be to extract all the data from the multitude of data sources, clean it, and then store it in a database that you will create. NOTE: The data is artificial so some columns might not give realistic values, for instance the latitude, longitude and card numbers columns. The rows you will need to clean will have clear errors; think missing values or characters that shouldn't be there.

In [1]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning

aws_connector = DatabaseConnector(
    creds_file="aws_db_creds.yaml"
)  # Initialize AWS connector for extraction
data_extractor = DataExtractor()
data_cleaner = DataCleaning()
local_connector = DatabaseConnector(
    creds_file="local_db_creds.yaml"
)  # Initialise local connector for uploading

## Task 3

In [4]:
# Step 1: Extract data from AWS RDS
table_names_list = aws_connector.list_db_tables()  # Find table name
print("List of table names:", table_names_list)
# Output: List of table names: ['legacy_store_details', 'dim_card_details', 'legacy_users', 'orders_table']
user_table_name = table_names_list[2]
user_df = data_extractor.read_rds_table(aws_connector, user_table_name)

# Step 2: Clean data
cleaned_user_df = data_cleaner.clean_user_data(user_df)
print(cleaned_user_df.info())

# Step 3: Use local connector for uploading
local_connector.upload_to_db(cleaned_user_df, "dim_users")
print("Uploaded cleaned user data to the local sales_data database.")

List of table names: ['legacy_store_details', 'dim_card_details', 'legacy_users', 'orders_table']
Initial number of rows: 15320
Rows after replacing 'NULL' strings: 15320
Rows after dropping NULLs: 15299
Rows after converting 'join_date': 15299
Final rows after date cleaning in user data: 15284
<class 'pandas.core.frame.DataFrame'>
Index: 15284 entries, 0 to 15319
Data columns (total 12 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   index          15284 non-null  int64         
 1   first_name     15284 non-null  object        
 2   last_name      15284 non-null  object        
 3   date_of_birth  15284 non-null  object        
 4   company        15284 non-null  object        
 5   email_address  15284 non-null  object        
 6   address        15284 non-null  object        
 7   country        15284 non-null  object        
 8   country_code   15284 non-null  object        
 9   phone_number   15284 non-null  obj

## Task 4

In [1]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning

data_extractor = DataExtractor()
data_cleaner = DataCleaning()
local_connector = DatabaseConnector(
    creds_file="local_db_creds.yaml"
)  # Initialise local connector for uploading
pdf_link = "https://data-handling-public.s3.eu-west-1.amazonaws.com/card_details.pdf"
card_df = data_extractor.retrieve_pdf_data(pdf_link)
# print("Card data extracted from PDF:")
# print(card_df.info()

# Step 2: Clean card data
cleaned_card_df = data_cleaner.clean_card_data(card_df)
print(cleaned_card_df.info())
# Step 3: Initialize local connector for uploading
local_connector.upload_to_db(cleaned_card_df, "dim_card_details")
print("Uploaded cleaned card data to the local sales_data database.")

Failed to import jpype dependencies. Fallback to subprocess.
No module named 'technology'


Initial number of rows: 15309
Rows after replacing 'NULL' strings: 15309
Rows after dropping NULLs: 15298
Rows after removing duplicates: 15298
Rows with modified card numbers exported to debug_csv/task_4/modified_card_numbers.csv
Rows after cleaning card numbers: 15297
Final rows after date cleaning: 15284
<class 'pandas.core.frame.DataFrame'>
Index: 15284 entries, 0 to 15308
Data columns (total 4 columns):
 #   Column                  Non-Null Count  Dtype         
---  ------                  --------------  -----         
 0   card_number             15284 non-null  object        
 1   expiry_date             15284 non-null  object        
 2   card_provider           15284 non-null  object        
 3   date_payment_confirmed  15284 non-null  datetime64[ns]
dtypes: datetime64[ns](1), object(3)
memory usage: 597.0+ KB
None
Uploaded 15284 rows to dim_card_details.
Uploaded cleaned card data to the local sales_data database.


## Task 5

In [1]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
import logging

try:
    # Initialize classes
    data_extractor = DataExtractor()
    data_cleaner = DataCleaning()
    local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")

    # Step 1: Retrieve the number of stores
    number_of_stores_endpoint = (
        "https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/number_stores"
    )
    num_stores = data_extractor.list_number_of_stores(number_of_stores_endpoint)
    print(f"Number of stores to retrieve: {num_stores}")

    # Step 2: Retrieve all stores data
    retrieve_store_endpoint = (
        "https://aqj7u5id95.execute-api.eu-west-1.amazonaws.com/prod/store_details"
    )
    stores_df = data_extractor.retrieve_stores_data(
        retrieve_store_endpoint, num_stores
    )
    print(f"Retrieved {len(stores_df)} stores from the API.")

    # Step 3: Clean the store data
    cleaned_stores_df = data_cleaner.clean_store_data(stores_df)
    print(f"Cleaned store data has {len(cleaned_stores_df)} rows.")
    print(cleaned_stores_df.info())

    # Step 4: Upload the cleaned data to the database
    local_connector.upload_to_db(cleaned_stores_df, "dim_store_details")
    print("Uploaded cleaned store data to the local sales_data database.")

except Exception as e:
    logging.error(f"Error in main workflow: {e}")

Number of stores to retrieve: 451
Retrieved 451 stores from the API.
Initial rows: 451
Rows after replacing invalid strings with pd.NA: 451
Rows after dropping NULLs: 447
Rows after staff number cleanup: 447
Rows after converting opening_date column into a datetime data type: 440
Store 0 reintegrated after cleaning.
Final rows after date validation: 441
Cleaned store data has 441 rows.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 441 entries, 0 to 440
Data columns (total 12 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   index          441 non-null    int64 
 1   address        440 non-null    object
 2   longitude      440 non-null    object
 3   lat            0 non-null      object
 4   locality       440 non-null    object
 5   store_code     441 non-null    object
 6   staff_numbers  441 non-null    object
 7   opening_date   441 non-null    object
 8   store_type     441 non-null    object
 9   latitude       440 non-null 

## Task 6

In [1]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning

# Step 1: Extract data from S3
data_extractor = DataExtractor()
s3_address = "s3://data-handling-public/products.csv"
products_df = data_extractor.extract_from_s3(s3_address)

# Step 2: Convert product weights unit to kg
data_cleaner = DataCleaning()
products_df = data_cleaner.convert_product_weights(products_df)

# Step 3: Clean products data
products_df = data_cleaner.clean_products_data(products_df)
print(products_df.info())

# Step 4: Upload to database
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
local_connector.upload_to_db(products_df, "dim_products")

print("Uploaded cleaned products data to the local sales_data database.")

Initial rows: 1853
Rows after replacing invalid strings: 1853
Row 1779: Set weight to 0 (previously NULL) as all the other values are valid.
Rows after dropping NULLs: 1846
Rows after converting weights to kg: 1846
Final rows after cleaning: 1846
<class 'pandas.core.frame.DataFrame'>
Index: 1846 entries, 0 to 1852
Data columns (total 10 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   Unnamed: 0     1846 non-null   int64 
 1   product_name   1846 non-null   object
 2   product_price  1846 non-null   object
 3   weight         1846 non-null   object
 4   category       1846 non-null   object
 5   EAN            1846 non-null   object
 6   date_added     1846 non-null   object
 7   uuid           1846 non-null   object
 8   removed        1846 non-null   object
 9   product_code   1846 non-null   object
dtypes: int64(1), object(9)
memory usage: 158.6+ KB
None
Uploaded cleaned products data to the local sales_data database.


## Task 7

### Step 1: List All Tables in the Database

In [1]:
from database_utils import DatabaseConnector

# Initialize the AWS RDS connector
aws_connector = DatabaseConnector(creds_file="aws_db_creds.yaml")

# List all tables in the database
table_names = aws_connector.list_db_tables()
print("List of table names:", table_names)

List of table names: ['legacy_store_details', 'dim_card_details', 'legacy_users', 'orders_table']


### Step 2: Extract Orders Data

In [2]:
from data_extraction import DataExtractor

# Initialize the data extractor
data_extractor = DataExtractor()

# Extract the orders data
orders_df = data_extractor.read_rds_table(aws_connector, "orders_table")
print("Orders data extracted successfully.")
print(orders_df.info())

Orders data extracted successfully.
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 120123 entries, 0 to 120122
Data columns (total 11 columns):
 #   Column            Non-Null Count   Dtype  
---  ------            --------------   -----  
 0   level_0           120123 non-null  int64  
 1   index             120123 non-null  int64  
 2   date_uuid         120123 non-null  object 
 3   first_name        15284 non-null   object 
 4   last_name         15284 non-null   object 
 5   user_uuid         120123 non-null  object 
 6   card_number       120123 non-null  int64  
 7   store_code        120123 non-null  object 
 8   product_code      120123 non-null  object 
 9   1                 0 non-null       float64
 10  product_quantity  120123 non-null  int64  
dtypes: float64(1), int64(4), object(6)
memory usage: 10.1+ MB
None


### Step 3: Clean Orders Data

### Step 4: Upload Cleaned Data to Database

In [3]:
from data_cleaning import DataCleaning

# Initialize the data cleaner
data_cleaner = DataCleaning()

# Clean the orders data
cleaned_orders_df = data_cleaner.clean_orders_data(orders_df)
print(cleaned_orders_df.info())

# Initialize the local database connector
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")

# Upload the cleaned data to the local database
local_connector.upload_to_db(cleaned_orders_df, "orders_table")
print("Uploaded cleaned orders data to the local sales_data database.")

Rows after cleaning: 120123
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 120123 entries, 0 to 120122
Data columns (total 8 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   level_0           120123 non-null  int64 
 1   index             120123 non-null  int64 
 2   date_uuid         120123 non-null  object
 3   user_uuid         120123 non-null  object
 4   card_number       120123 non-null  int64 
 5   store_code        120123 non-null  object
 6   product_code      120123 non-null  object
 7   product_quantity  120123 non-null  int64 
dtypes: int64(4), object(4)
memory usage: 7.3+ MB
None
Uploaded 120123 rows to orders_table.
Uploaded cleaned orders data to the local sales_data database.


## Task 8

In [4]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning

# Step 1: Extract JSON data from S3
data_extractor = DataExtractor()
s3_url = "https://data-handling-public.s3.eu-west-1.amazonaws.com/date_details.json"
date_times_df = data_extractor.extract_json_from_s3(s3_url)

if not date_times_df.empty:

    # Step 2: Clean the date times data
    data_cleaner = DataCleaning()
    cleaned_date_times_df = data_cleaner.clean_date_times_data(date_times_df)
    print(date_times_df.info())

    # Step 3: Upload cleaned data to the local database
    local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
    local_connector.upload_to_db(cleaned_date_times_df, "dim_date_times")
    print("Uploaded cleaned date times data to the local sales_data database.")
else:
    print("Failed to extract date times data from S3.")

Date times data extracted successfully.
Rows after cleaning: 120123
<class 'pandas.core.frame.DataFrame'>
Index: 120123 entries, 0 to 120160
Data columns (total 6 columns):
 #   Column       Non-Null Count   Dtype 
---  ------       --------------   ----- 
 0   timestamp    120123 non-null  object
 1   month        120123 non-null  int64 
 2   year         120123 non-null  int64 
 3   day          120123 non-null  int64 
 4   time_period  120123 non-null  object
 5   date_uuid    120123 non-null  object
dtypes: int64(3), object(3)
memory usage: 6.4+ MB
None
Uploaded 120123 rows to dim_date_times.
Uploaded cleaned date times data to the local sales_data database.


# Milestone 3: Create the database schema
##### Develop the star-based schema of the database, ensuring that the columns are of the correct data types.

## Task 1

In [8]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import UUID, VARCHAR, SMALLINT

# Step 1: Extract orders data
data_extractor = DataExtractor()
aws_connector = DatabaseConnector(creds_file="aws_db_creds.yaml")
orders_df = data_extractor.read_rds_table(aws_connector, "orders_table")

# Step 2: Clean orders data
data_cleaner = DataCleaning()
cleaned_orders_df = data_cleaner.clean_orders_data(orders_df)

# Calculate maximum lengths
max_card_number_length = cleaned_orders_df["card_number"].astype(str).str.len().max()
max_store_code_length = cleaned_orders_df["store_code"].astype(str).str.len().max()
max_product_code_length = cleaned_orders_df["product_code"].astype(str).str.len().max()

# print(f"Max card_number length: {max_card_number_length}")
# print(f"Max store_code length: {max_store_code_length}")
# print(f"Max product_code length: {max_product_code_length}")

# Step 3: Define correct data types
data_types = {
    "date_uuid": UUID,
    "user_uuid": UUID,
    "card_number": VARCHAR(max_card_number_length),
    "store_code": VARCHAR(max_store_code_length),
    "product_code": VARCHAR(max_product_code_length),
    "product_quantity": SMALLINT,
}

# Step 4: Upload to database with correct data types
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
local_connector.upload_to_db(cleaned_orders_df, "orders_table", dtype=data_types)

Rows after cleaning: 120123
Uploaded 120123 rows to orders_table.


In [None]:
# Run following SQL Query to Check Data Types in pgAdmin 4
# SELECT column_name, data_type
# FROM information_schema.columns
# WHERE table_name = 'orders_table';

## Task 2

In [13]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import UUID, VARCHAR, DATE

# Step 1: Extract users data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_users_df = data_extractor.read_rds_table(local_connector, "dim_users")

# Step 2: Clean users data (if necessary)
data_cleaner = DataCleaning()
cleaned_dim_users_df = data_cleaner.clean_user_data(dim_users_df)

# Calculate maximum length of country_code
max_country_code_length = dim_users_df["country_code"].astype(str).str.len().max()
max_first_name_length = dim_users_df["first_name"].astype(str).str.len().max()
max_last_name_length = dim_users_df["last_name"].astype(str).str.len().max()
print(f"Max country_code length: {max_country_code_length}")
print(f"Max first_name length: {max_first_name_length}")
print(f"Max last_name length: {max_last_name_length}")

# Step 3: Define correct data types
data_types = {
    "first_name": VARCHAR(max_first_name_length),
    "last_name": VARCHAR(max_last_name_length),
    "date_of_birth": DATE,
    "country_code": VARCHAR(max_country_code_length),
    "user_uuid": UUID,
    "join_date": DATE,
}

# Step 4: Upload to database with correct data types
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
local_connector.upload_to_db(cleaned_dim_users_df, "dim_users", dtype=data_types)

Initial number of rows: 15284
Rows after replacing 'NULL' strings: 15284
Rows after dropping NULLs: 15284
Rows after converting 'join_date': 15284
Final rows after date cleaning in user data: 15284
Max country_code length: 3
Max first_name length: 14
Max last_name length: 15
Uploaded 15284 rows to dim_users.


## Task 3

In [14]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import VARCHAR, NUMERIC, SMALLINT, DATE
import pandas as pd

# Step 1: Extract store details data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_store_details_df = data_extractor.read_rds_table(
    local_connector, "dim_store_details"
)

# Step 2: Clean store details data
data_cleaner = DataCleaning()
cleaned_dim_store_details_df = data_cleaner.clean_store_data(dim_store_details_df)

# Step 3: Merge latitude columns
cleaned_dim_store_details_df["latitude"] = cleaned_dim_store_details_df[
    "latitude"
].combine_first(cleaned_dim_store_details_df["lat"])
cleaned_dim_store_details_df.drop("lat", axis=1, inplace=True)

# Replace "N/A" with NULL in the 'location' column
cleaned_dim_store_details_df["locality"].replace("N/A", pd.NA, inplace=True)

# Step 4: Calculate maximum lengths for VARCHAR columns
max_store_code_length = (
    cleaned_dim_store_details_df["store_code"].astype(str).str.len().max()
)
max_country_code_length = (
    cleaned_dim_store_details_df["country_code"].astype(str).str.len().max()
)
max_locality_length = (
    cleaned_dim_store_details_df["locality"].astype(str).str.len().max()
)
max_continent_length = (
    cleaned_dim_store_details_df["continent"].astype(str).str.len().max()
)
max_store_type_length = (
    cleaned_dim_store_details_df["store_type"].astype(str).str.len().max()
)

print(f"Max store_code length: {max_store_code_length}")
print(f"Max country_code length: {max_country_code_length}")
print(f"Max locality length: {max_locality_length}")
print(f"Max continent length: {max_continent_length}")
print(f"Max store_type length: {max_store_type_length}")

# Step 5: Define correct data types
data_types = {
    "longitude": NUMERIC,
    "locality": VARCHAR(max_locality_length),
    "store_code": VARCHAR(max_store_code_length),
    "staff_numbers": SMALLINT,
    "opening_date": DATE,
    "store_type": VARCHAR(max_store_type_length),
    "latitude": NUMERIC,
    "country_code": VARCHAR(max_country_code_length),
    "continent": VARCHAR(max_continent_length),
}

# Step 6: Upload to database with correct data types
local_connector.upload_to_db(
    cleaned_dim_store_details_df, "dim_store_details", dtype=data_types
)

Initial rows: 441
Rows after replacing invalid strings with pd.NA: 441
Rows after dropping NULLs: 440
Rows after staff number cleanup: 440
Rows after converting opening_date column into a datetime data type: 440
Store 0 reintegrated after cleaning.
Final rows after date validation: 441
Max store_code length: 12
Max country_code length: 2
Max locality length: 20
Max continent length: 9
Max store_type length: 11
Uploaded 441 rows to dim_store_details.


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  cleaned_dim_store_details_df["locality"].replace("N/A", pd.NA, inplace=True)


## Task 4 (Refer to sales_data_milstone3.sql for the SQL code)
##### The following code for Task 4 hasn't been tested

In [None]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import VARCHAR, NUMERIC
import pandas as pd

# Step 1: Extract products data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_products_df = data_extractor.read_rds_table(local_connector, "dim_products")

# Step 2: Clean products data
data_cleaner = DataCleaning()
cleaned_dim_products_df = data_cleaner.clean_products_data(dim_products_df)

# Step 3: Remove the '£' character from the 'product_price' column
cleaned_dim_products_df["product_price"] = cleaned_dim_products_df[
    "product_price"
].str.replace("£", "")

# Step 4: Convert the 'weight' column to numeric
cleaned_dim_products_df["weight"] = pd.to_numeric(
    cleaned_dim_products_df["weight"], errors="coerce"
)

# Step 5: Add the 'weight_class' column
cleaned_dim_products_df["weight_class"] = pd.cut(
    cleaned_dim_products_df["weight"],
    bins=[-1, 2, 40, 140, float("inf")],
    labels=["Light", "Mid_Sized", "Heavy", "Truck_Required"],
)

# Step 6: Calculate maximum lengths for VARCHAR columns
max_product_code_length = (
    cleaned_dim_products_df["product_code"].astype(str).str.len().max()
)
max_weight_class_length = (
    cleaned_dim_products_df["weight_class"].astype(str).str.len().max()
)
max_product_price_length = (
    cleaned_dim_products_df["product_price"].astype(str).str.len().max()
)

print(f"Max product_code length: {max_product_code_length}")
print(f"Max weight_class length: {max_weight_class_length}")
print(f"Max product_price length: {max_product_price_length}")

# Step 7: Define correct data types
data_types = {
    "product_code": VARCHAR(max_product_code_length),
    "product_price": NUMERIC,
    "weight": NUMERIC,
    "weight_class": VARCHAR(max_weight_class_length),
}

# Step 8: Upload to database with correct data types
local_connector.upload_to_db(cleaned_dim_products_df, "dim_products", dtype=data_types)

## Task 5

In [15]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import VARCHAR, NUMERIC, DATE, UUID, BOOLEAN
import pandas as pd

# Step 1: Extract products data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_products_df = data_extractor.read_rds_table(local_connector, "dim_products")

# Step 2: Clean products data
data_cleaner = DataCleaning()
cleaned_dim_products_df = data_cleaner.clean_products_data(dim_products_df)

# Step 3: Rename the 'removed' column to 'still_available'
cleaned_dim_products_df.rename(columns={"removed": "still_available"}, inplace=True)

# Step 4: Update 'still_available' values
cleaned_dim_products_df["still_available"] = cleaned_dim_products_df[
    "still_available"
].map(
    {
        "Still_avaliable": True,
        "Removed": False,
    }
)

# Step 5: Calculate maximum lengths for VARCHAR columns
max_ean_length = cleaned_dim_products_df["EAN"].astype(str).str.len().max()
max_product_code_length = (
    cleaned_dim_products_df["product_code"].astype(str).str.len().max()
)
max_weight_class_length = (
    cleaned_dim_products_df["weight_class"].astype(str).str.len().max()
)

print(f"Max EAN length: {max_ean_length}")
print(f"Max product_code length: {max_product_code_length}")
print(f"Max weight_class length: {max_weight_class_length}")

# Step 6: Define correct data types
data_types = {
    "product_price": NUMERIC,
    "weight": NUMERIC,
    "EAN": VARCHAR(max_ean_length),
    "product_code": VARCHAR(max_product_code_length),
    "date_added": DATE,
    "uuid": UUID,
    "still_available": BOOLEAN,
    "weight_class": VARCHAR(max_weight_class_length),
}

# Step 7: Upload to database with correct data types
local_connector.upload_to_db(cleaned_dim_products_df, "dim_products", dtype=data_types)

Initial rows: 1846
Rows after replacing invalid strings: 1846
Rows after dropping NULLs: 1846
Rows after converting weights to kg: 1846
Final rows after cleaning: 1846
Max EAN length: 17
Max product_code length: 11
Max weight_class length: 14
Uploaded 1846 rows to dim_products.


## Task 6

In [16]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import VARCHAR, UUID
import pandas as pd

# Step 1: Extract date times data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_date_times_df = data_extractor.read_rds_table(local_connector, "dim_date_times")

# Step 2: Clean date times data
data_cleaner = DataCleaning()
cleaned_dim_date_times_df = data_cleaner.clean_date_times_data(dim_date_times_df)

# Step 3: Calculate maximum lengths for VARCHAR columns
max_month_length = cleaned_dim_date_times_df["month"].astype(str).str.len().max()
max_year_length = cleaned_dim_date_times_df["year"].astype(str).str.len().max()
max_day_length = cleaned_dim_date_times_df["day"].astype(str).str.len().max()
max_time_period_length = (
    cleaned_dim_date_times_df["time_period"].astype(str).str.len().max()
)

print(f"Max month length: {max_month_length}")
print(f"Max year length: {max_year_length}")
print(f"Max day length: {max_day_length}")
print(f"Max time_period length: {max_time_period_length}")

# Step 4: Define correct data types
data_types = {
    "month": VARCHAR(max_month_length),
    "year": VARCHAR(max_year_length),
    "day": VARCHAR(max_day_length),
    "time_period": VARCHAR(max_time_period_length),
    "date_uuid": UUID,
}

# Step 5: Upload to database with correct data types
local_connector.upload_to_db(
    cleaned_dim_date_times_df, "dim_date_times", dtype=data_types
)

Rows after cleaning: 120123
Max month length: 2
Max year length: 4
Max day length: 2
Max time_period length: 10
Uploaded 120123 rows to dim_date_times.


## Task 7

In [2]:
from database_utils import DatabaseConnector
from data_extraction import DataExtractor
from data_cleaning import DataCleaning
from sqlalchemy import VARCHAR, DATE
import pandas as pd

# Step 1: Extract card details data
data_extractor = DataExtractor()
local_connector = DatabaseConnector(creds_file="local_db_creds.yaml")
dim_card_details_df = data_extractor.read_rds_table(local_connector, "dim_card_details")

# Step 2: Clean card details data
data_cleaner = DataCleaning()
cleaned_dim_card_details_df = data_cleaner.clean_card_data(dim_card_details_df)

# Step 3: Calculate maximum lengths for VARCHAR columns
max_card_number_length = (
    cleaned_dim_card_details_df["card_number"].astype(str).str.len().max()
)
max_expiry_date_length = (
    cleaned_dim_card_details_df["expiry_date"].astype(str).str.len().max()
)

print(f"Max card_number length: {max_card_number_length}")
print(f"Max expiry_date length: {max_expiry_date_length}")

# Step 4: Define correct data types
data_types = {
    "card_number": VARCHAR(max_card_number_length),
    "expiry_date": VARCHAR(max_expiry_date_length),
    "date_payment_confirmed": DATE,
}

# Step 5: Upload to database with correct data types
local_connector.upload_to_db(
    cleaned_dim_card_details_df, "dim_card_details", dtype=data_types
)

Initial number of rows: 15284
Rows after replacing 'NULL' strings: 15284
Rows after dropping NULLs: 15284
Rows after removing duplicates: 15284
Rows with modified card numbers exported to debug_csv/task_4/modified_card_numbers.csv
Rows after cleaning card numbers: 15284
Final rows after date cleaning: 15284
Max card_number length: 19
Max expiry_date length: 5
Uploaded 15284 rows to dim_card_details.


## Task 8 (Refer to sales_data_milstone3.sql for the SQL code)

## Task 9 (Refer to sales_data_milstone3.sql for the SQL code)