In [1]:
#importa pacotes para usar SQL no contexto do Spark
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

In [2]:
#roda e lê a base
data = sqlContext.read.load('file:///C:/Spark/projetos/Data/base.csv', 
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

In [3]:
#mostra as duas primeiras linhas da base
data.show(2)

+--------+-------------+--------+--------+------------------+-------+----------+-------------------+---------------+--------------------+--------------+---------------------+-------------+-----+-----+----------+-----------+--------------------+
|   CAMIS|          DBA|    BORO|BUILDING|            STREET|ZIPCODE|     PHONE|CUISINE DESCRIPTION|INSPECTION DATE|              ACTION|VIOLATION CODE|VIOLATION DESCRIPTION|CRITICAL FLAG|SCORE|GRADE|GRADE DATE|RECORD DATE|     INSPECTION TYPE|
+--------+-------------+--------+--------+------------------+-------+----------+-------------------+---------------+--------------------+--------------+---------------------+-------------+-----+-----+----------+-----------+--------------------+
|50074025|    LILA CAFE|BROOKLYN|     911|        DEKALB AVE|  11221|3475292886|          Caribbean|     03/27/2018|Violations were c...|           10F| Non-food contact ...| Not Critical|   33| null|      null| 04/26/2018|Pre-permit (Non-o...|
|41573314|SABANA LOU

In [4]:
#imprime os rótulos e tipos de dado da base
data.printSchema()

root
 |-- CAMIS: integer (nullable = true)
 |-- DBA: string (nullable = true)
 |-- BORO: string (nullable = true)
 |-- BUILDING: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- ZIPCODE: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- CUISINE DESCRIPTION: string (nullable = true)
 |-- INSPECTION DATE: string (nullable = true)
 |-- ACTION: string (nullable = true)
 |-- VIOLATION CODE: string (nullable = true)
 |-- VIOLATION DESCRIPTION: string (nullable = true)
 |-- CRITICAL FLAG: string (nullable = true)
 |-- SCORE: integer (nullable = true)
 |-- GRADE: string (nullable = true)
 |-- GRADE DATE: string (nullable = true)
 |-- RECORD DATE: string (nullable = true)
 |-- INSPECTION TYPE: string (nullable = true)



In [5]:
#exclui os dados não rotulados
data = data[~data['CRITICAL FLAG'].isin(['Not Applicable'])]
#retira as colunas da tabela que não serão utilizadas na classificação
drop_list = ['CAMIS', 'DBA', 'BORO', 'BUILDING', 'STREET', 'ZIPCODE','CUISINE DESCRIPTION', 'PHONE', 'INSPECTION DATE', 'ACTION', 'VIOLATION CODE', 'SCORE', 'GRADE', 'GRADE DATE', 'RECORD DATE', 'INSPECTION TYPE']

In [6]:
#mostra as colunas que serão utilizadas na classificação
#CRITICAL FLAG É A COLUNA COM OS CLASSIFICADORES
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+---------------------+-------------+
|VIOLATION DESCRIPTION|CRITICAL FLAG|
+---------------------+-------------+
| Non-food contact ...| Not Critical|
| Non-food contact ...| Not Critical|
| Non-food contact ...| Not Critical|
| Non-food contact ...| Not Critical|
| Food contact surf...|     Critical|
+---------------------+-------------+
only showing top 5 rows



In [7]:
#imprime os rótulos e tipos de dado da base
data.printSchema()

root
 |-- VIOLATION DESCRIPTION: string (nullable = true)
 |-- CRITICAL FLAG: string (nullable = true)



In [8]:
#conta quantas linhas há para cada classificador
from pyspark.sql.functions import col
data.groupBy("CRITICAL FLAG") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+-------------+------+
|CRITICAL FLAG| count|
+-------------+------+
|     Critical|204546|
| Not Critical|162077|
+-------------+------+



In [9]:
#conta as principais descrições
data.groupBy("VIOLATION DESCRIPTION") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(5)

+---------------------+-----+
|VIOLATION DESCRIPTION|count|
+---------------------+-----+
| Non-food contact ...|52814|
| Facility not verm...|38420|
| Evidence of mice ...|26858|
| Food not protecte...|25364|
| Food contact surf...|25240|
+---------------------+-----+
only showing top 5 rows



In [10]:
#importa pacotes para tokenizar, remover stopwords e vetorizar
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="VIOLATION DESCRIPTION", outputCol="words", pattern="\\W")
# stop words
#usando stop list of 25 semantically non-selective words which are common in Reuters-RCV1.
add_stopwords = ["a","an", "and", "are", "as", "at", "be", "by", "for", "from", "has", "he", "in", "is", "it", "its", "of", "on", "that", "the", "to", "was", "were", "will", "with"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

In [11]:
#cria rótulos númericos para os classificadores
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "CRITICAL FLAG", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [12]:
# Forma o pipeline to para treinar os documentos.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

+---------------------+-------------+--------------------+--------------------+--------------------+-----+
|VIOLATION DESCRIPTION|CRITICAL FLAG|               words|            filtered|            features|label|
+---------------------+-------------+--------------------+--------------------+--------------------+-----+
| Non-food contact ...| Not Critical|[non, food, conta...|[non, food, conta...|(528,[0,1,2,4,5,6...|  1.0|
| Non-food contact ...| Not Critical|[non, food, conta...|[non, food, conta...|(528,[0,1,2,4,5,6...|  1.0|
| Non-food contact ...| Not Critical|[non, food, conta...|[non, food, conta...|(528,[0,1,2,4,5,6...|  1.0|
| Non-food contact ...| Not Critical|[non, food, conta...|[non, food, conta...|(528,[0,1,2,4,5,6...|  1.0|
| Food contact surf...|     Critical|[food, contact, s...|[food, contact, s...|(528,[1,2,5,6,7,3...|  0.0|
+---------------------+-------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [13]:
# Define as sementes para reprodutibilidade
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 257050
Test Dataset Count: 109573


In [14]:
#calcula o tf-idf
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawTF", numFeatures=10000)
tf = hashingTF.transform(trainingData)
idf = IDF(inputCol="rawTF", outputCol="IDF", minDocFreq=5) #minDocFreq: remove sparse terms
idfModel = idf.fit(tf) 
tfidf = idfModel.transform(tf)
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [15]:
#soma os vetores
from pyspark.ml.linalg import SparseVector, DenseVector

# Representacao do vetor de classes
df = dataset.select('label','features')
vetor_de_classes = df.rdd.mapValues(lambda v: v.toArray()) \
    .reduceByKey(lambda x, y: x + y) \
    .mapValues(lambda x: DenseVector(x)) \
    .toDF(["label", "features_sum"])
    
vetor_de_classes.show(2)

+-----+--------------------+
|label|        features_sum|
+-----+--------------------+
|  0.0|[213413.0,297513....|
|  1.0|[356079.0,119209....|
+-----+--------------------+



In [18]:
# preparo para o calculo da distancia euclidiana para classe 0.0
array0 = vetor_de_classes.filter('label = 0.0').collect()[0]['features_sum']
print(array0)

[213413.0,297513.0,121387.0,220272.0,55598.0,25257.0,28358.0,25570.0,8605.0,66065.0,0.0,62059.0,9689.0,123.0,81856.0,10723.0,55598.0,55598.0,36924.0,55600.0,54419.0,330.0,53716.0,330.0,330.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,50825.0,50876.0,42773.0,49698.0,11776.0,38366.0,0.0,0.0,608.0,0.0,0.0,0.0,0.0,0.0,37242.0,36712.0,36712.0,36712.0,36712.0,36712.0,6498.0,33076.0,26060.0,29311.0,25434.0,27981.0,25512.0,27675.0,27383.0,26772.0,1030.0,26438.0,25240.0,26042.0,23217.0,25463.0,25364.0,25364.0,25364.0,25364.0,25265.0,25248.0,25240.0,25240.0,25240.0,25240.0,25240.0,3146.0,18485.0,15179.0,23292.0,24022.0,23790.0,23790.0,115.0,0.0,23300.0,0.0,23296.0,23292.0,22938.0,97.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,18356.0,18356.0,18356.0,18356.0,18356.0,18356.0,18356.0,18356.0,16263.0,15276.0,0.0,13569.0,12491.0,8082.0,9105.0,6515.0,9644.0,3146.0,8819.0,2788.0,8494.0,8494.0,8320.0,8320.0,8314.0,8091.0,8074.0,8074.0,7836.0,7836.0,7762.0,7762.0,7704.0,2502.0,0.0,7071.0,7071.0,7071.0,7071.0,3538

In [19]:
# preparo para o calculo da distancia euclidiana para classe 1.0
array1 = vetor_de_classes.filter('label = 1.0').collect()[0]['features_sum']
print(array1)

[356079.0,119209.0,230316.0,0.0,105628.0,123876.0,113290.0,110890.0,111028.0,52814.0,115260.0,43134.0,78056.0,84578.0,0.0,67026.0,3495.0,3228.0,21675.0,577.0,707.0,53589.0,0.0,52814.0,52814.0,52818.0,52814.0,52814.0,52814.0,52814.0,52814.0,52814.0,52814.0,52814.0,52814.0,52814.0,775.0,0.0,7535.0,293.0,37458.0,5771.0,39194.0,39194.0,38420.0,38420.0,38420.0,38420.0,38420.0,38420.0,0.0,0.0,0.0,0.0,0.0,0.0,26934.0,0.0,4478.0,0.0,3775.0,1189.0,3175.0,0.0,0.0,0.0,25450.0,0.0,1042.0,0.0,2366.0,0.0,0.0,0.0,0.0,0.0,97.0,0.0,0.0,0.0,0.0,0.0,0.0,21675.0,6019.0,9064.0,775.0,0.0,0.0,0.0,23300.0,23300.0,0.0,23300.0,0.0,0.0,0.0,21675.0,21675.0,21675.0,21675.0,21675.0,21675.0,21675.0,21675.0,21675.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,459.0,14376.0,0.0,836.0,4996.0,2194.0,4124.0,0.0,6433.0,0.0,6019.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4996.0,7428.0,0.0,0.0,0.0,0.0,3395.0,0.0,0.0,0.0,0.0,4025.0,2400.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,6433.0,2935.0,6074.0,6059.0,6019.0,0.0,4388.0

In [20]:
# calcula a distancia euclidiana
from scipy.spatial import distance
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

euc0 = udf(lambda features: distance.euclidean(features, array0), FloatType())
euc1 = udf(lambda features: distance.euclidean(features, array1), FloatType())

df = df.withColumn('distance0', euc0(df.features))
df = df.withColumn('distance1', euc1(df.features))
df.show(5)

+-----+--------------------+---------+---------+
|label|            features|distance0|distance1|
+-----+--------------------+---------+---------+
|  1.0|(528,[0,1,2,4,5,6...| 530766.2| 596561.5|
|  1.0|(528,[0,1,2,4,5,6...| 530766.2| 596561.5|
|  1.0|(528,[0,1,2,4,5,6...| 530766.2| 596561.5|
|  1.0|(528,[0,1,2,4,5,6...| 530766.2| 596561.5|
|  0.0|(528,[1,2,5,6,7,3...| 530767.7|596566.25|
+-----+--------------------+---------+---------+
only showing top 5 rows

