# Kaggle submission for challenge: Real or not? NLP with disaster Tweets

### First we need to initialize spark

In [35]:
import findspark
findspark.init()
import pyspark


In [36]:
import sparknlp 

spark = sparknlp.start()

print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
Apache Spark version


'2.4.5'

### Now we import the train dataset and take a look, preventing null values to appear

In [37]:
from pyspark.sql.types import *

schema = StructType([
    StructField("id", StringType(), False),
    StructField("keyword", StringType(), False),
    StructField("location", StringType(), False),
    StructField("text", StringType(), False),
    StructField("target", IntegerType(), False)
])

df = spark.read.option("multiline",True).csv("train.csv", header=True, schema=schema)
df = df.filter(df.text.isNotNull())
df = df.filter(df.target.isNotNull())
df = df.na.fill("")
df.show()

+---+-------+--------+--------------------+------+
| id|keyword|location|                text|target|
+---+-------+--------+--------------------+------+
|  1|       |        |Our Deeds are the...|     1|
|  4|       |        |Forest fire near ...|     1|
|  5|       |        |All residents ask...|     1|
|  6|       |        |13,000 people rec...|     1|
|  7|       |        |Just got sent thi...|     1|
|  8|       |        |#RockyFire Update...|     1|
| 10|       |        |#flood #disaster ...|     1|
| 13|       |        |I'm on top of the...|     1|
| 14|       |        |There's an emerge...|     1|
| 15|       |        |I'm afraid that t...|     1|
| 16|       |        |Three people died...|     1|
| 17|       |        |Haha South Tampa ...|     1|
| 18|       |        |#raining #floodin...|     1|
| 19|       |        |#Flood in Bago My...|     1|
| 20|       |        |Damage to school ...|     1|
| 23|       |        |      What's up man?|     0|
| 24|       |        |       I 

In [38]:
df.count()

7613

### Now we create a pretrained pipeline for text processing, adding lemmatizing, normalization and IF-IDF steps

In [39]:
from pyspark import SparkContext
sc =SparkContext.getOrCreate()

locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

from sparknlp.annotator import Stemmer, Lemmatizer, LemmatizerModel, NorvigSweetingModel, SymmetricDeleteModel, Normalizer
from sparknlp import DocumentAssembler, Finisher
from pyspark.ml.feature import StopWordsRemover
from sparknlp.pretrained import PretrainedPipeline
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import CountVectorizer, NGram


sparknlp_pipeline = PretrainedPipeline(
    'explain_document_ml', lang='en').model

normalizer = Normalizer()\
    .setInputCols(['lemmas'])\
    .setOutputCol('normalized')\
    .setLowercase(True)

finisher = Finisher()\
    .setInputCols(['normalized'])\
    .setOutputCols(['normalized'])\
    .setOutputAsArray(True)


sw_remover = StopWordsRemover() \
    .setInputCol("normalized") \
    .setOutputCol("filtered")

count_vectorizer = CountVectorizer(inputCol='normalized', 
    outputCol='tf')

idfIgnore = IDF(minDocFreq=10) \
    .setInputCol("tf") \
    .setOutputCol("idf")

pipeline = Pipeline().setStages([
    sparknlp_pipeline, normalizer, finisher, sw_remover, count_vectorizer, idfIgnore])

explain_document_ml download started this may take some time.
Approx size to download 9.4 MB
[OK!]


### We create a second pipeline that includes the prediction algorithm, in this case a random forest classifier, and then train the model

In [40]:
from pyspark.ml.feature import IndexToString, StringIndexer
from pyspark.ml.classification import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#label_indexer = StringIndexer(inputCol='sentiment', outputCol='label').fit(df)

#predictor = NaiveBayes(featuresCol='idf', labelCol="target")
#predictor = SVM(featuresCol='idf', labelCol="target")
#predictor = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, featuresCol='idf', labelCol="target")
#predictor = DecisionTreeClassifier(labelCol="target", featuresCol="idf")
predictor = RandomForestClassifier(labelCol="target", featuresCol="idf")
#predictor = GBTClassifier(labelCol="label", featuresCol="idf", maxIter=10) # binary
#predictor = MultilayerPerceptronClassifier(maxIter=100, layers=[4,5,4,3], blockSize=128, seed=1234, featuresCol='idf')
#predictor = LinearSVC(maxIter=10, regParam=0.1, featuresCol='idf')


#prediction_deindexer = IndexToString(inputCol='prediction', outputCol='pred_newsgroup', 
#                                     labels=label_indexer.labels)

pipeline2 = Pipeline(stages=[
    pipeline, predictor
])


#paramGrid = ParamGridBuilder().build()
  #.addGrid(count_vectorizer.minDF, [10]) \
  #.addGrid(idfIgnore.minDocFreq, [10]).build()
  #.addGrid(predictor.numTrees, [10]) \
  

#crossval = CrossValidator(estimator=pipeline2,
#                          estimatorParamMaps=paramGrid,
#                          evaluator=BinaryClassificationEvaluator(metricName='areaUnderROC', labelCol='target'),
#                          numFolds=2)  

#model2 = crossval.fit(df)
model2 = pipeline2.fit(df)

In [41]:
print(predictor.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featureSubsetStrategy: The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to 'onethird' for regression), 'all' (use all features), 'onethird' (use 1/3 of the features), 'sqrt' (use sqrt(number of features)), 'log2' (use log2(number of features)), 

### We create a dataframe for the test data

In [43]:
from pyspark.sql.types import *

schema = StructType([
    StructField("id", StringType(), False),
    StructField("keyword", StringType(), False),
    StructField("location", StringType(), False),
    StructField("text", StringType(), False)
])

df_t = spark.read.option("multiline",True).csv("test.csv", header=True, schema=schema)
df_t.show()

+---+-------+--------------------+--------------------+
| id|keyword|            location|                text|
+---+-------+--------------------+--------------------+
|  0|   null|                null|Just happened a t...|
|  2|   null|                null|Heard about #eart...|
|  3|   null|                null|there is a forest...|
|  9|   null|                null|Apocalypse lighti...|
| 11|   null|                null|Typhoon Soudelor ...|
| 12|   null|                null|We're shaking...I...|
| 21|   null|                null|They'd probably s...|
| 22|   null|                null|   Hey! How are you?|
| 27|   null|                null|    What a nice hat?|
| 29|   null|                null|           Fuck off!|
| 30|   null|                null|No I don't like c...|
| 35|   null|                null|NOOOOOOOOO! Don't...|
| 42|   null|                null|No don't tell me ...|
| 43|   null|                null|           What if?!|
| 45|   null|                null|            Aw

### Now we generate the predictions using the model on the training dataset to check performance

In [44]:
train_predicted = model2.transform(df)
#test_predicted = model2.transform(df_t)

### To check performance we create a binary classification evaluator and evaluate on the training dataset

In [45]:
evaluator = BinaryClassificationEvaluator(metricName='areaUnderROC', labelCol='target')

In [46]:
print('areaUnderROC', evaluator.evaluate(train_predicted))

areaUnderROC 0.7546152550623892


### The performance metric could be better with hyperparameter tuning

### Now we make the predictions on the test dataset and save the submission file

In [47]:
df_t = df_t.na.fill("")
df_predicted = model2.transform(df_t).toPandas()

In [48]:
df_final = df_predicted[['id', 'prediction']]
df_final.columns = ['id', 'target']
df_final['target'] = df_final['target'].astype(int)

df_final.to_csv('submission.csv', index=False)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  This is separate from the ipykernel package so we can avoid doing imports until
