In [None]:
# import all methods

from pyspark.sql import SparkSession
from data_processing import get_processed_data, save_dataset_to_db, get_data_set_from_db
from analysis import process_df, vectorize_data, stardize_data, perform_cross_validation
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation
import seaborn as sns
import matplotlib.pyplot as plt

In [None]:
# Database variables
database_name = 'distritos'
database_user_name = 'postgres'
database_password = '123'
database_table = 'distritos'

# Other variables
features = ['POBLACION_2016', 'IDS', 'DENSIDAD', 'TCP',
            'CTAAM', 'CTM', 'CPPR', 'CPRU', 'CPHM', 'CPEXM',
            'TASA_ASALTO', 'TASA_HOMICIDIO', 'TASA_HURTO', 'TASA_ROBO']
label = 'CTAA'
features_standard = 'features_starndard'

In [None]:
# Merging and processsing all data

spark = SparkSession.builder.appName('database').master('local')\
                            .appName("Basic JDBC pipeline") \
                            .config("spark.driver.extraClassPath", "postgresql-42.1.4.jar") \
                            .config("spark.executor.extraClassPath", "postgresql-42.1.4.jar") \
                            .getOrCreate()


distritos_df = get_processed_data(spark, 'data/distritos.csv', 'data/crimenes.csv',
                               'data/escuelas.csv', 'data/colegios.csv',
                               'data/extranjeros_escuelas.csv', 'data/extranjeros_colegios.csv',
                               'data/processed/dataset.csv')

distritos_df.select(['POBLACION_2016', 'IDS', 'TASA_ROBO', 'CTAA']).show()

In [None]:
# Saving and getting it from the database

save_dataset_to_db(distritos_df, database_name, database_user_name, database_password, database_table)

distritos_df = get_data_set_from_db(spark, database_name, database_user_name, database_password, database_table)


In [None]:
# Show the Pearson Matrix

df = process_df(distritos_df, features + [label], label)
df = vectorize_data(df, features)
df = stardize_data(df, features_standard)

pearson_matrix = Correlation.corr(df, 'features').collect()[0][0]

sns.heatmap(pearson_matrix.toArray(), annot=True, fmt=".2f", cmap='viridis')

In [None]:
# Train and evaluate a Linear Regression Model

train, test = df.randomSplit([0.9, 0.1])
lr = LinearRegression(featuresCol=features_standard, labelCol='label')

cv_model = perform_cross_validation(train, lr)

evaluation_summary = cv_model.bestModel.evaluate(test)

print('R2: ', evaluation_summary.r2)
print('MSE: ', evaluation_summary.meanSquaredError)