### Imports

In [None]:
import pyspark.pandas as ps
from sklearn.preprocessing import MinMaxScaler
import numpy as np
from sklearn.preprocessing import StandardScaler
import time
from pyspark.sql import SparkSession
import psutil
import os

spark = SparkSession.builder.getOrCreate()

#### Helper funcs

In [None]:
def get_memory_usage():
    process = psutil.Process(os.getpid())
    return process.memory_info().rss / (1024 * 1024)

def get_cpu_usage():
    process = psutil.Process(os.getpid())
    return process.cpu_percent(interval=0.1)

### Metrics

In [None]:
start = time.time()

start_mem = get_memory_usage()
start_cpu = psutil.cpu_percent(interval=1)
print(f"Start mem: {start_mem:.2f} MB")
print(f"Start cpu: {start_cpu:.2f} MB")



### Load Datasets

In [None]:
crime = ps.read_csv("datasets/state_crime.csv")
minimum_wage = ps.read_csv("datasets/min_wage.csv", encoding="cp1252")
wages_by_education = ps.read_csv("datasets/wages_by_education.csv")

### Tratamento

##### Valores Nulos

In [None]:
#print("Valores nulos dataset crime: ", crime.isna().sum())

In [None]:
#print("Valores nulos dataset minimum wage: ", minimum_wage.isna().sum())
minimum_wage = minimum_wage.drop(columns=["Footnote", "Department.Of.Labor.Uncleaned.Data"])

In [None]:
#print("Valores nulos dataset education: ", wages_by_education.isna().sum())

Valores nulos apenas no dataset sobre minimum_wage nas colunas

- Department.Of.Labor.Cleaned.Low.Value.2020.Dollars
- Department.Of.Labor.Cleaned.High.Value.2020.Dollars
- Footnote

No entantanto nesse dataset, valores 0 também são valores nulos



In [None]:
true_null = minimum_wage.replace(0, np.nan)
#print("Valores nulos dataset minimum wage: ", true_null.isna().sum())

#### Substituir valores nulos - Interpolação

In [None]:
for col in ['State.Minimum.Wage', 'State.Minimum.Wage.2020.Dollars', 
            'Department.Of.Labor.Cleaned.Low.Value', 'Department.Of.Labor.Cleaned.Low.Value.2020.Dollars',
            'Department.Of.Labor.Cleaned.High.Value', 'Department.Of.Labor.Cleaned.High.Value.2020.Dollars',
            ]:
    minimum_wage[col] = minimum_wage[col].interpolate(method='linear', limit_direction='both')


In [None]:
#print("Valores nulos dataset minimum wage: ", minimum_wage.isna().sum())


#### Normalizar

In [None]:
scaler = MinMaxScaler(feature_range=(0, 1))

def normalize_numerical_columns(df):
    pdf = df.to_pandas()
    
    numerical_columns = pdf.select_dtypes(include=['number']).columns.tolist()
    
    if 'Year' in numerical_columns:
        numerical_columns.remove('Year')
    
    pdf_scaled = pdf.copy()
    pdf_scaled[numerical_columns] = scaler.fit_transform(pdf[numerical_columns])
    
    return ps.from_pandas(pdf_scaled)

minimum_wage_scaled = normalize_numerical_columns(minimum_wage)
crime_scaled = normalize_numerical_columns(crime)
wages_by_education_scaled = normalize_numerical_columns(wages_by_education)


In [None]:
#print("Valores nulos dataset minimum wage: ", minimum_wage.isna().sum())


#### Merge

In [None]:
merged_data = crime.merge(minimum_wage, on=["Year", "State"], how="outer")
merged_data = merged_data.dropna()
merged_data.to_pandas().to_csv("datasets/merged_data.csv", index=False)

merged_data = merged_data.rename(columns={'Year': 'year'})
merged_data = merged_data.merge(wages_by_education, on=["year"], how="outer")
merged_data = merged_data.dropna()
merged_data.to_pandas().to_csv("datasets/merged_data_2.csv", index=False)

end = time.time()
final = end - start

end_mem = get_memory_usage()
end_cpu = psutil.cpu_percent(interval=1)
memory_total = end_mem - start_mem
print(f"Execution time: {final} seconds")
print(f"Final memory: {end_mem:.2f}")
print(f"Final cpu: {end_cpu:.2f}")
print(f"Total memory: {memory_total:.2f} MB")
print(f"CPU: {get_cpu_usage():.2f}%")

In [None]:
#print("Valores nulos datasets: ", merged_data.isna().sum())
