In [1]:
from datetime import datetime
import numpy as np
import mysql.connector
import pandas as pd
import requests
from io import StringIO
from sqlalchemy import create_engine
from sklearn.model_selection import train_test_split

In [2]:
# Conexion a la bd
conn = mysql.connector.connect(
    host="mysql",
    user="airflow",
    password="airflow",
    database="airflow"
)

cursor = conn.cursor()
engine = create_engine("mysql+mysqlconnector://airflow:airflow@mysql/airflow")

# Ejecuta sql
query = """
        WITH max_batch AS (
            SELECT *,
            MAX(batch_number) OVER () AS max_batch_number
            FROM raw_data
        )
        SELECT
        *
        FROM max_batch
        WHERE batch_number = max_batch_number        
        ;
        """
df = pd.read_sql_query(query, conn)

  df = pd.read_sql_query(query, conn)


In [3]:
CATEGORICAL_FEATURES = ["brokered_by",
                        "status",
                        "street",
                        "city",
                        "state",
                        "zip_code",
                        "prev_sold_date"]

NUMERICAL_FEATURES = ["price",
                      "bed",
                      "bath",
                      "acre_lot",
                      "house_size"]

In [4]:
# Impute missing values for categorical features with the mode
for feature in CATEGORICAL_FEATURES:
    mode_value = df[feature].mode()[0]
    df[feature].fillna(mode_value, inplace=True)

# Impute missing values for numerical features with the median
for feature in NUMERICAL_FEATURES:
    median_value = df[feature].median()
    df[feature].fillna(median_value, inplace=True)

In [5]:
# Record the initial number of rows
initial_rows = df.shape[0]

# Remove values outside the 0.25th and 99.95th percentiles
for feature in NUMERICAL_FEATURES:
    lower_bound = df[feature].quantile(0.0025)
    upper_bound = df[feature].quantile(0.9995)
    df = df[(df[feature] >= lower_bound) & (df[feature] <= upper_bound)]

# Record the number of rows after filtering
final_rows = df.shape[0]

# Calculate the number of rows and percentage of rows eliminated
rows_eliminated = initial_rows - final_rows
percent_eliminated = (rows_eliminated / initial_rows) * 100

# Display the filtered DataFrame and the elimination stats
print(f"Number of rows eliminated: {rows_eliminated}")
print(f"Percentage of rows eliminated: {percent_eliminated:.2f}%")

Number of rows eliminated: 2695
Percentage of rows eliminated: 0.84%


In [6]:
# Unique key columns
unique_key = ['street', 'city', 'state', 'zip_code', 'price', 'brokered_by']
initial_size = len(df)
# Sort DataFrame by 'prev_sold_date' in descending order
df = df.copy().sort_values(by='prev_sold_date', ascending=False)

# Drop duplicates based on unique key and keep the last occurrence
df = df.copy().drop_duplicates(subset=unique_key, keep='last')
size_no_duplicates = len(df)
# Count the number of duplicates
num_duplicates = size_no_duplicates - initial_size

# Calculate the percentage of duplicates
percent_duplicates = (num_duplicates / initial_size) * 100

# Display the number and percentage of duplicates
print(f"Number of duplicates: {num_duplicates}")
print(f"Percentage of duplicates: {percent_duplicates:.2f}%")

Number of duplicates: -21877
Percentage of duplicates: -6.88%


In [7]:
# Check table existence and insert data
with engine.connect() as conn:
    table_exists = engine.dialect.has_table(conn, 'clean_data')
    if not table_exists:
        print("La tabla 'clean_data' no existe.")
        df.iloc[:0].to_sql('clean_data', con=engine, if_exists='replace', index=False)
    else:
        conn = mysql.connector.connect(host="mysql",user="airflow",password="airflow",database="airflow")        
        existing_batches_query = "SELECT DISTINCT batch_number FROM clean_data;"
        existing_batches = pd.read_sql_query(existing_batches_query, conn)
        existing_batches_set = set(existing_batches['batch_number'])                
        
        # Filter DataFrame to only include rows with batch_numbers not in clean_data
        df_to_insert = df[~df['batch_number'].isin(existing_batches_set)]

        # Insert data into the table
        if not df_to_insert.empty:
            df_to_insert.to_sql('clean_data', con=engine, if_exists='append', index=False, chunksize=10000)
            print("Datos insertados en 'clean_data'.")
        else:
            print("No hay nuevos datos para insertar en 'clean_data'.")

  existing_batches = pd.read_sql_query(existing_batches_query, conn)


Datos insertados en 'clean_data'.


In [10]:
df

Unnamed: 0,brokered_by,status,price,bed,bath,acre_lot,street,city,state,zip_code,house_size,prev_sold_date,batch_number,max_batch_number
29688,88236.0,for_sale,459900.0,5.0,4.0,1.44,1644679.0,Clarkesville,Georgia,30523.0,5000.0,2024-04-17,4,5
35844,33717.0,for_sale,409000.0,4.0,1.0,0.54,1027298.0,Dawsonville,Georgia,30534.0,3391.0,2024-03-31,4,5
35934,105022.0,for_sale,385000.0,3.0,3.0,2.20,75361.0,Gillsville,Georgia,30543.0,2577.0,2023-10-31,4,5
107522,46047.0,for_sale,446990.0,4.0,3.0,0.12,252700.0,Caldwell,Idaho,83607.0,2695.0,2023-06-30,4,5
105565,8.0,for_sale,489400.0,2.0,2.0,13.00,1980905.0,Carpenter,Wyoming,82054.0,2496.0,2023-06-01,4,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
16147,101734.0,for_sale,450000.0,4.0,3.0,0.94,1029531.0,Bear,Delaware,19701.0,2025.0,1950-04-01,4,5
80676,52954.0,for_sale,429900.0,4.0,3.0,0.31,265522.0,Palos Heights,Illinois,60463.0,1756.0,1949-03-27,4,5
53330,16829.0,for_sale,450000.0,3.0,3.0,0.47,1593819.0,Fort Myers,Florida,33908.0,1596.0,1909-03-01,4,5
89190,13791.0,for_sale,405000.0,3.0,3.0,0.23,1232215.0,Norman,Oklahoma,73072.0,2617.0,1905-07-17,4,5
