Skip to content

Latest commit

 

History

History
64 lines (48 loc) · 2.52 KB

Script_Spark.md

File metadata and controls

64 lines (48 loc) · 2.52 KB

import pandas as pd from pyspark.mllib.clustering import KMeans, KMeansModel from numpy import array

###Remove espaços do header
adclicksDataFrame = pd.read_csv('./ad-clicks.csv')
adclicksDataFrame = adclicksDataFrame.rename(columns=lambda x: x.strip())
adclicksDataFrame.head(n=2)

###Adiciona a coluna adCount
adclicksDataFrame['adCount'] = 1
adclicksDataFrame.head(n=2)

###Remove espaços do header
buyclicksDataFrame = pd.read_csv('./buy-clicks.csv')
buyclicksDataFrame = buyclicksDataFrame.rename(columns=lambda x: x.strip())
buyclicksDataFrame.head(n=2)

###Seleciona as colunas userID and price e remove as demais
PurchasesDataFrame = buyclicksDataFrame[['userId','price']]
PurchasesDataFrame.head(n=2)

###Seleciona as colunas userID e adCount e remove as demais
useradClicksDataFrame = adclicksDataFrame[['userId','adCount']]
useradClicksDataFrame.head(n=2)

###Cria novo arquivo agregando a coluna adCount pela coluna userId
PerUserDataFrame = useradClicksDataFrame.groupby('userId').sum()
PerUserDataFrame = PerUserDataFrame.reset_index()

###Renomeia as colunas
PerUserDataFrame.columns = ['userId','totalAdClicks']
PerUserDataFrame.head(n=2)

###Cria novo arquivo agregando a coluna price pela coluna userId
revenuePerUserDataFrame = PurchasesDataFrame.groupby('userId').sum()
revenuePerUserDataFrame = revenuePerUserDataFrame.reset_index()

###Renomeia as colunas
revenuePerUserDataFrame.columns = ['userId','revenue']
revenuePerUserDataFrame.head(n=2)

###Junta os dois arquivos (PerUserDataFrame + revenuePerUserDataFrame)
###userid, adCount, price
combinedDataFrame = PerUserDataFrame.merge(revenuePerUserDataFrame, on='userId')
combinedDataFrame.head(n=2)

###Cria o Dataset de treinamento (training)
###A coluna useriD será removida

trainingDataFrame = combinedDataFrame[['totalAdClicks','revenue']]
trainingDataFrame.head(n=2)

###Converte os Datasets em formato que pode ser entendido pelo Kmeans.train function
sqlContext = SQLContext(sc)
pdf = sqlContext.createDataFrame(trainingDataFrame)

###TotalAdClicks,revenue
parsedData = pdf.rdd.map(lambda line: array([line[0], line[1]]))
my_kmmodel = KMeans.train(parsedData, 2, maxIterations=10, runs=10, initializationMode="random")

###Veja abaixo o cluster que iremos analizar
print(my_kmmodel.centers)