In [None]:
!pip install pyspark

In [None]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
# Libraries for data manipulation
import numpy as np
import pandas as pd
from pathlib import Path

# Libraries for data visualisation
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import seaborn as sns


%matplotlib inline
plt.style.use('seaborn-colorblind')

# Libraries for building classifiers
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from pyspark.sql.functions import *


import warnings
warnings.filterwarnings('ignore')

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:

file_location2 = "/content/drive/MyDrive/PREDICT GENDER/fist_name_full.csv"
file_location = "/content/drive/MyDrive/PREDICT GENDER/complete_names_full.csv"
file_location1 = "/content/drive/MyDrive/PREDICT GENDER/names2predict_31_mayo.csv"
file_type = "csv"


In [None]:

## To train and test                         
nd_02 = spark.read.format(file_type).option("inferSchema", "true").option("header","true").option("delimiter",";").load(file_location)
nd_02 = nd_02.withColumn("origen", lit(0))
#nd_02 = nd_02.withColumn("ID", monotonically_increasing_id())
nd_02 = nd_02.select('name', 'gender','origen')

nd_01 = spark.read.format(file_type).option("inferSchema", "true").option("header","true").option("delimiter",";").load(file_location2)
nd_01 = nd_01.withColumn("origen", lit(0))
#nd_01 = nd_01.withColumn("ID", monotonically_increasing_id())
nd_01 = nd_01.select('name', 'gender','origen')

nd_0 = nd_02.union(nd_01)
nd_0 = nd_0.withColumn("ID", monotonically_increasing_id())
nd_0 = nd_0.select('ID','name', 'gender','origen')
nd_0 = nd_0.sample(fraction=0.07)


In [None]:

from pyspark.sql.functions import lit, StringType
file_type = "csv"
## To predict
nd_2 = spark.read.format(file_type).option("inferSchema", "true").option("header","true").option("delimiter",";").load(file_location1)
nd_2 = nd_2.withColumn("origen", lit(2))
nd_2 = nd_2.withColumn("gender", lit(None).cast(StringType()))
nd_2 = nd_2.select('ID','name', 'gender','origen')
#nd_2 = nd_2.limit(2000000)

In [None]:

print(nd_0.count())
print(nd_2.count())
#5329203

280545
1835093


In [None]:
nd_1 = nd_0.union(nd_2)
print(nd_1.count())
#nd_1 = nd_1.filter(nd_1.origen == 0).drop(nd_1.origen)


2115638


In [None]:
# Check imbalance and compute weights
import pandas as pd
counts = nd_1.filter(nd_1.origen == 0).groupBy('gender').count().toPandas()
print(counts)


# Counts
count_male = counts[counts['gender']=='male']['count'].values[0]
count_total = counts['count'].sum()

# Weights
c = 2
weight_male = count_total / (c * count_male)
weight_female = count_total / (c * (count_total - count_male))

# Append weights to the dataset
from pyspark.sql.functions import col
from pyspark.sql.functions import when

#nd_1 = nd_1.withColumn("weight", when(col("gender") =='male', weight_male).otherwise(when(col("gender") =='female', weight_female).otherwise(lit(0))

nd_1 = nd_1.withColumn("weight", when(nd_1.gender == "male",weight_male)
                                 .when(nd_1.gender == "female",weight_female)
                                 .when(nd_1.gender.isNull() ,0)
                                 .otherwise(lit(0))) 
                                                                                   
                                                                                     

# Check everything seems ok
nd_1.select('gender', 'weight').where(col('gender')=='male').show(3)


ndf = nd_1.select("*").toPandas()
ndf['name'] = ndf['name'].apply(lambda x: x.lower())


   gender   count
0  female  166386
1    male  114159
+------+-----------------+
|gender|           weight|
+------+-----------------+
|  male|1.228746747956797|
|  male|1.228746747956797|
|  male|1.228746747956797|
+------+-----------------+
only showing top 3 rows



In [None]:
# MAGIC **Creating a new feature on name length**
ndf['name_len'] = ndf['name'].apply(lambda x: len(x))


# MAGIC **Creating a new feature on last letter of the name**
ndf['last_letter_vowel'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a','e','i','o','u'] else 0)


# MAGIC **Open Vowel**

ndf['open_vowel'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a','e','o'] else (2 if x[-1] in ['i','u'] else 0))
ndf['vowel_a'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a']  else 0)
ndf['vowel_oe'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['o','e']  else 0)
ndf['last_three'] = ndf['name'].apply(lambda x:  x[-3:])
ndf['last_five'] = ndf['name'].apply(lambda x:  x[-5:])
ndf['first3'] = ndf['name'].apply(lambda x:  x[:3])
ndf['first5'] = ndf['name'].apply(lambda x:  x[:5])

# MAGIC **Creating a new feature for calculating the number of vowels and consonents in a name**

# MAGIC Helper function for consonent and vowel calculation


def letter_class(name):
    name_list = [x for x in name]
    vowel_counter = 0
    consonent_counter = 0
    for letter in name_list:
        if letter in ['a','e','i','o','u']:
            vowel_counter+=1
        else:
            consonent_counter+=1
    
    return vowel_counter, consonent_counter


def name_convertor(name_list):
    ndf = pd.DataFrame([], columns=['name','name_len'
                                    ,'last_letter_vowel','open_vowel',
                                    'vowel_a','vowel_o', 'weight','vowel_n','last_three','last_five','first3','first5'])
    ndf['name'] = name_list
    ndf['name_len'] = ndf['name'].apply(lambda x: len(x))
    ndf['last_letter_vowel'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a','e','i','o','u'] else 0)
    ndf['open_vowel'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a','e','o'] else 0)
    ndf['vowel_a'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['a']  else 0)
    ndf['vowel_oe'] = ndf['name'].apply(lambda x: 1 if x[-1] in ['o','e']  else 0)
    ndf['last_three'] = ndf['name'].apply(lambda x:  x[-3:])
    ndf['last_five'] = ndf['name'].apply(lambda x:  x[-5:])
    ndf['first3'] = ndf['name'].apply(lambda x:  x[:3])
    ndf['first5'] = ndf['name'].apply(lambda x:  x[:5])
    
    return ndf

In [None]:
# MAGIC **Encoding the gender as binary values**
ndf['class'] = ndf['gender'].apply(lambda x: 1 if x=='female' else (0 if x == 'male' else 2))


dataset = ndf[['name_len', 'origen','last_letter_vowel','vowel_a','vowel_oe','open_vowel','weight','class','last_three','first3','first5','last_five']]


dataset.shape

(2115638, 12)

In [None]:
# Create a Spark DataFrame from a pandas DataFrame using Arrow
dataset_spark = spark.createDataFrame(dataset)

In [None]:

from pyspark.ml.feature import StringIndexer

last_three_indexer = StringIndexer(inputCol="last_three", outputCol="last_threeIndex")
last_five_indexer = StringIndexer(inputCol="last_five", outputCol="last_fiveIndex")
first3_indexer = StringIndexer(inputCol="first3", outputCol="first3Index")
first5_indexer = StringIndexer(inputCol="first5", outputCol="first5Index")
#Fits a model to the input dataset with optional parameters.
dataset_spark = last_three_indexer.fit(dataset_spark).transform(dataset_spark)
dataset_spark = last_five_indexer.fit(dataset_spark).transform(dataset_spark)
dataset_spark = first3_indexer.fit(dataset_spark).transform(dataset_spark)
dataset_spark = first5_indexer.fit(dataset_spark).transform(dataset_spark)
#dataset_spark.show()

In [None]:

from pyspark.ml.feature import OneHotEncoder

#onehotencoder to each variable

onehotencoder_last_three_vector = OneHotEncoder(inputCol="last_threeIndex", outputCol="last_three_vec")
onehotencoder_last_five_vector = OneHotEncoder(inputCol="last_fiveIndex", outputCol="last_five_vec")
onehotencoder_first3_vector = OneHotEncoder(inputCol="first3Index", outputCol="first3_vec")
onehotencoder_first5_vector = OneHotEncoder(inputCol="first5Index", outputCol="first5_vec")

dataset_spark = onehotencoder_last_three_vector.fit(dataset_spark).transform(dataset_spark)
dataset_spark = onehotencoder_last_five_vector.fit(dataset_spark).transform(dataset_spark)
dataset_spark = onehotencoder_first3_vector.fit(dataset_spark).transform(dataset_spark)
dataset_spark = onehotencoder_first5_vector.fit(dataset_spark).transform(dataset_spark)

#dataset_spark.show()

In [None]:

from pyspark.sql.functions import log
## Variables para filtrar los datos y para meter al modelo
#variables = ["COD_DANE","label","IPM_obs_cero","IPM","P1","P2","P3","P4","P5",'REG_DEF','CLASE',"features"]
#covariables = ["P1","P2","P3","P4","P5",'REG_DEF','CLASE']
variables = ['label','name_len', 'origen','open_vowel','vowel_a','vowel_oe','weight','last_letter_vowel',"features",
             'last_three_vec','last_five_vec','first3_vec', 'first5_vec']
variables_new = ['name_len', 'origen','open_vowel','vowel_a','vowel_oe','last_letter_vowel',"features",'last_five_vec',
             'last_three_vec','first3_vec','first5_vec']

covariables = ['name_len','open_vowel','vowel_a','vowel_oe','last_letter_vowel','last_five_vec',
              'last_three_vec','first3_vec','first5_vec']




In [None]:
## Vectorizar las covariables que van a ingresar al modelo
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
import math
from pyspark.sql.types import IntegerType,DoubleType

definitivo = dataset_spark.withColumnRenamed("class", "label")
#definitivo = definitivo.filter((definitivo.origen == 0)).filter((definitivo.name_len > 2))
definitivo = VectorAssembler(inputCols = covariables, outputCol="features").setHandleInvalid("keep").transform(definitivo)

## Botar valores nulos en la respuesta 
datos2 = definitivo.filter(definitivo.name_len >= 4).filter(definitivo.origen == 0).sample(withReplacement=False, fraction=0.3)
datos2 = datos2.select(variables)

datos3 = definitivo.filter(definitivo.origen == 2)
datos3 = datos3.select(variables_new)


In [None]:
print(datos3.count())
print(datos2.count())

1835093
84073


In [None]:
display(datos2)

DataFrame[label: bigint, name_len: bigint, origen: bigint, open_vowel: bigint, vowel_a: bigint, vowel_oe: bigint, weight: double, last_letter_vowel: bigint, features: vector, last_three_vec: vector, last_five_vec: vector, first3_vec: vector, first5_vec: vector]

In [None]:
from pyspark.ml.feature import VectorIndexer
# Fit on whole dataset to include all labels in index.
   
labelIndexer_t= StringIndexer(inputCol="label", outputCol="indexedLabel").fit(datos2)

#labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

featureIndexer_t =VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(datos2)

In [None]:
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
# featureIndexer =\
#    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories = 4).fit(datos2)

datos2 = datos2.filter(datos2.name_len >= 2).filter(datos2.origen == 0).drop(datos2.origen)
datos3 = datos3.filter(datos3.origen == 2).drop(datos3.origen)


print(datos3.count())

## Dividir conjunto de entrenamiento y de prueba

(trainSet_g1, testSet_g1) = datos2.randomSplit([0.8, 0.2], seed=100)
print(trainSet_g1.count())
print(testSet_g1.count())

In [None]:
#=====================================================================================================#
#        Validación Cruzada para identificar los parámetros óptimos usando Random Forest              #
#=====================================================================================================#

from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
from pyspark.ml.feature import IndexToString

## Profundidad de los árboles
deep =[5,10,15,20,25,29]#range(2, 29)

## Número de árboles
arboles =[10, 100,200]#
#arboles =[1,2,5,10,20,50,75,100,150,200]

## Listas vacías para llenar con los valores de accuracy y árboles
scores_1 = []
scores = []
arb = []
prof = []

## Semilla para dividir datos de prueba y entrenamiento
# semillas = np.random.randint(low = 1000, high = 5000, size = len(deep)*len(arboles) )

contador = 0
for k in deep:
  for a in arboles:
    ## Dividir datos en conjunto de prueba y entrenamiento
    # (trainSet_g1, testSet_g1) = datos2.randomSplit([0.8, 0.2], seed = semillas[contador])
  
    ## Ajuste del modelo  
    model = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = a, maxDepth = k, weightCol='weight', 
                                   impurity = 'entropy').fit(trainSet_g1)
    
    # Convert indexed labels back to original labels.
    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer_t.labels)
    
    # Chain indexers and forest in a Pipeline
    pipeline = Pipeline(stages=[labelIndexer_t, featureIndexer_t, model, labelConverter])
    
    # Train model.  This also runs the indexers.
    model = pipeline.fit(trainSet_g1)

    ## Predicción en el conjunto de entrenamiento
    predictions = model.transform(trainSet_g1)

    ## Comparar contra lo observado a través del RMSE (en datos de entrenamiento)
    evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    scores.append(accuracy)
  
    ## Predicción en el conjunto de prueba
    prediction_1 = model.transform(testSet_g1)
    
    ## Comparar contra lo observado a través del accuracy (en datos de prueba)
    accuracy_1 = evaluator.evaluate(prediction_1)
    scores_1.append(accuracy_1)
    
    ## Vector de árboles y profundidad
    arb.append(a)
    prof.append(k)
    
    ## Contador
    contador = contador + 1
    
error_rf = sqlContext.createDataFrame(zip(arb, prof, scores, scores_1), schema = ['árboles','prof','accuracy_train','accuracy_test'])
display(error_rf)


In [None]:
#=============================================================================#
#      Ajuste del modelo Random Forest usando los parámetros óptimos          #
#=============================================================================#

from pyspark.ml.classification import RandomForestClassifier
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import numpy as np
from pyspark.ml.feature import IndexToString
import sys
import os

## Semilla para dividir datos de prueba y entrenamiento
# semillas = np.random.randint(low = 1000, high = 5000, size = len(deep)*len(arboles) )

## Dividir datos en conjunto de prueba y entrenamiento
# (trainSet_g1, testSet_g1) = datos2.randomSplit([0.8, 0.2], seed = semillas[contador])
  
## Ajuste del modelo  
model = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees = 100, maxDepth = 29, weightCol='weight', 
                                   impurity = 'entropy').fit(trainSet_g1)
    
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer_t.labels)
    
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer_t, featureIndexer_t, model, labelConverter])
    
# Train model.  This also runs the indexers.
model = pipeline.fit(trainSet_g1)
    


In [None]:
# Step 4: Save the model
model.save_weights('Gender.h5')

In [None]:
!cp -r 'Gender.h5'  '/content/drive/MyDrive/PREDICT GENDER/MODELGender' #

In [None]:
!cp -r '/content/drive/MyDrive/PREDICT GENDER/MODEL' 'boyorgirl.h5'

In [None]:
## Predicción en el conjunto de entrenamiento y todos los datos
predictions_train = model.transform(trainSet_g1)
predictions_full = model.transform(datos2)
prediction_test = model.transform(testSet_g1)


## Comparar contra lo observado a través del Accuracy (en datos de entrenamiento)
evaluator = MulticlassClassificationEvaluator( labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy_train = evaluator.evaluate(predictions_train)
accuracy_test = evaluator.evaluate(prediction_test)
accuracy_full = evaluator.evaluate(predictions_full)
    
print("Accuracy in training data = %g" % accuracy_train)
print("Accuracy in testing data = %g" % accuracy_test)
print("Accuracy in full data = %g" % accuracy_full)

In [None]:
##Load model
#import keras
#model = keras.models.load_model(')
# Create a basic model instance

model.load_weights('/content/drive/MyDrive/PREDICT GENDER/MODEL/')

In [None]:


## Confusion matrix for training data

y_true = predictions_train.select(['label']).collect()
y_pred = predictions_train.select(['prediction']).collect()

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true, y_pred))


import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt


df = pd.DataFrame({'y_Actual':y_true, 'y_Predicted':y_pred})
confusion_matrix = pd.crosstab(df['y_Actual'], df['y_Predicted'], normalize='index', rownames=['Actual'], colnames=['Predicted'])

sn.heatmap(confusion_matrix,fmt='.2%', cmap='Blues', annot=True)     
plt.show()


In [None]:
## Confusion matrix for testing data

y_true = prediction_test.select(['label']).collect()
y_pred = prediction_test.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

df = pd.DataFrame({'y_Actual':y_true, 'y_Predicted':y_pred})
confusion_matrix = pd.crosstab(df['y_Actual'], df['y_Predicted'], normalize='index', rownames=['Actual'], colnames=['Predicted'])

sn.heatmap(confusion_matrix,fmt='.2%', cmap='Blues', annot=True)     
plt.show()

In [None]:
## Confusion matrix for full data

y_true = predictions_full.select(['label']).collect()
y_pred = predictions_full.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))


df = pd.DataFrame({'y_Actual':y_true, 'y_Predicted':y_pred})
confusion_matrix = pd.crosstab(df['y_Actual'], df['y_Predicted'], normalize='index', rownames=['Actual'], colnames=['Predicted'])

sn.heatmap(confusion_matrix,fmt='.2%', cmap='Blues', annot=True)     
plt.show()


In [None]:
# import required modules
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Conv2D, Dense, Flatten, Dropout
from tensorflow.keras.layers import GlobalMaxPooling2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.models import Model

In [None]:
from keras.models import load_model

In [None]:

from keras.models import load_model
model=load_weights("boyorgirl.h5")

In [None]:
# MAGIC # Predicting on a new data set


## Predicción en el conjunto de entrenamiento y todos los datos
y_pred = model.transform(datos3).select('prediction').collect()


df = pd.DataFrame({'y_Predicted':y_pred})
df2= nd_2.select("*").toPandas()


df_c = pd.concat([df2, df], axis=1)
df_c


df_c[df_c.name == 'JOSECcarELESTINO']


In [None]:

df_c.to_csv("sex_predited.csv")

In [None]:
!cp -r 'sex_predited.csv'  '/content/drive/MyDrive/PREDICT GENDER/sex_predited_Mig.csv' #