#### Download data 311 a service that new york city residents use to give their complaints reports like noise, potholes ,.. to police from https://nycopendata.socrata.com/Social-Services/311-Service-Requests-from-2010-to-Present/erm2-nwe9. 

#### Classify the type of complaint given the description written by people

In [5]:
import findspark
findspark.init('/home/sama/spark-2.4.5-bin-hadoop2.7')
import pyspark

In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import unix_timestamp, from_unixtime, split
from pyspark.sql.functions import year, month, hour
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, IndexToString, Word2Vec
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as F
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF

In [7]:
spark = SparkSession.builder.appName("SR311_pred_model").getOrCreate()

### Reading the csv file to spark dataframe


In [8]:
df = spark.read.csv("/home/user/Desktop/311.csv", header = True, inferSchema = True)

### Dropping irrelevant features


In [9]:
droping_attribs = [c for c in df.columns if c not in ["Complaint Type","Descriptor"]]

df = df.drop(*droping_attribs)
df.columns

['Complaint Type', 'Descriptor']

In [10]:
df = df.na.drop()
df.count()

22340895

In [11]:
df = df.sample(fraction = 0.05)

In [12]:
df.count()

1117161

In [13]:
df.show(truncate = False)

+--------------+------------------------+
|Complaint Type|Descriptor              |
+--------------+------------------------+
|HEAT/HOT WATER|APARTMENT ONLY          |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|APARTMENT ONLY          |
|HEAT/HOT WATER|APARTMENT ONLY          |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|WATER LEAK    |DAMP SPOT               |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|DOOR/WINDOW   |WINDOW FRAME            |
|DOOR/WINDOW   |WINDOW FRAME            |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|HEAT/HOT WATER|ENTIRE BUILDING         |
|PLUMBING      |WATER SUPPLY            |
|PLUMBING      |WATER SUPPLY            |
|SAFETY        |CARBON MONOXIDE DETECTOR|
+--------------+------------------

# Extract features from description


In [48]:
regex_token = RegexTokenizer(inputCol="Descriptor", outputCol="words", pattern="\\W")
stopwords_remove = StopWordsRemover(inputCol="words", outputCol="clean")
count_vectors = CountVectorizer(inputCol="clean", outputCol="countvec", vocabSize=10000, minDF=5)
idf = IDF(inputCol="countvec", outputCol="features")



pipeline = Pipeline(stages=[regex_token, stopwords_remove, count_vectors, idf]).fit(df)
final_data = pipeline.transform(df)
final_data.show(5)


+--------------+---------------+------------------+------------------+--------------------+--------------------+
|Complaint Type|     Descriptor|             words|             clean|            countvec|            features|
+--------------+---------------+------------------+------------------+--------------------+--------------------+
|HEAT/HOT WATER| APARTMENT ONLY| [apartment, only]|       [apartment]|   (1445,[21],[1.0])|(1445,[21],[3.903...|
|HEAT/HOT WATER|ENTIRE BUILDING|[entire, building]|[entire, building]|(1445,[3,7],[1.0,...|(1445,[3,7],[2.83...|
|HEAT/HOT WATER|ENTIRE BUILDING|[entire, building]|[entire, building]|(1445,[3,7],[1.0,...|(1445,[3,7],[2.83...|
|HEAT/HOT WATER|ENTIRE BUILDING|[entire, building]|[entire, building]|(1445,[3,7],[1.0,...|(1445,[3,7],[2.83...|
|HEAT/HOT WATER|ENTIRE BUILDING|[entire, building]|[entire, building]|(1445,[3,7],[1.0,...|(1445,[3,7],[2.83...|
+--------------+---------------+------------------+------------------+--------------------+-----

### Convert label to numerical index

In [49]:
labelIndexer = StringIndexer(inputCol = "Complaint Type", outputCol = "label").fit(final_data)
data = labelIndexer.transform(final_data)

#### Split to tran and test sets

In [66]:
final_ = data.select("features", "label")

In [51]:
(trainingData, testData) = final_.randomSplit([0.7, 0.3])

### Train a logistic regression on data

In [52]:
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lr_model = lr.fit(trainingData)

#### Transform test sat by model

In [53]:
predictions = lr_model.transform(testData)

In [58]:
predictions

DataFrame[features: vector, label: double, rawPrediction: vector, probability: vector, prediction: double]

#### Display predictions

In [62]:
predictions.select("probability","label","prediction").show()

+--------------------+-----+----------+
|         probability|label|prediction|
+--------------------+-----+----------+
|[0.06654629112094...|171.0|       0.0|
|[0.43170164563788...|  0.0|       0.0|
|[0.43170164563788...| 17.0|       0.0|
|[0.21031851655374...|  0.0|       0.0|
|[0.47214307556650...|  0.0|       0.0|
|[0.07461309686581...| 29.0|      29.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
|[0.03123214225314...|  1.0|       1.0|
+--------------------+-----+----------+
only showing top 20 rows



### Evaluate accuracy

In [63]:
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName="accuracy")
evaluator.evaluate(predictions)

0.8621256844818872