In [1]:
import findspark
findspark.init()
findspark.find()

'C:\\Users\\Laptop\\Downloads\\spark-2.4.7-bin-hadoop2.7\\spark-2.4.7-bin-hadoop2.7'

In [2]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [3]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField, IntegerType, BooleanType, DoubleType

In [4]:
new_schema = StructType([
    StructField(name='title', dataType=StringType(), nullable=True),
    StructField(name='telecommuting', dataType=StringType(), nullable=True),
    StructField(name='has_company_logo', dataType=StringType(), nullable=True),
    StructField(name='has_questions', dataType=StringType(), nullable=True),
    StructField(name='fraudulent', dataType=StringType(), nullable=True),
    StructField(name='company_id', dataType=DoubleType(), nullable=False),        
    StructField(name='full_description', dataType=StringType(), nullable=False)])

In [5]:
new_data = spark \
    .read \
    .format("csv") \
    .schema(new_schema) \
    .options(path="DataSet", header=True,delimiter = ";",multiline = True) \
    .load()

In [6]:
new_data.printSchema()

root
 |-- title: string (nullable = true)
 |-- telecommuting: string (nullable = true)
 |-- has_company_logo: string (nullable = true)
 |-- has_questions: string (nullable = true)
 |-- fraudulent: string (nullable = true)
 |-- company_id: double (nullable = true)
 |-- full_description: string (nullable = true)



In [7]:
new_data.show()

+--------------------+-------------+----------------+-------------+----------+----------+--------------------+
|               title|telecommuting|has_company_logo|has_questions|fraudulent|company_id|    full_description|
+--------------------+-------------+----------------+-------------+----------+----------+--------------------+
|    Marketing Intern|            f|               t|            f|         f|      45.0|US, NY, New York ...|
|Customer Service ...|            f|               t|            f|         f|     104.0|NZ, , Auckland Su...|
|Commissioning Mac...|            f|               t|            f|         f|       6.0|US, IA, Wever  <h...|
|Account Executive...|            f|               t|            f|         f|      37.0|US, DC, Washingto...|
| Bill Review Manager|            f|               t|            t|         f|      25.0|US, FL, Fort Wort...|
|    Accounting Clerk|            f|               f|            f|         f|       0.0|US, MD,    <p><b>...|
|

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler,HashingTF, Tokenizer,IDF
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [None]:
(trainingData, testData) = new_data['telecommuting','has_company_logo','fraudulent'].randomSplit([0.7, 0.3])

categoricalColumns = ['telecommuting', 'has_company_logo']

stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    stages += [stringIndexer]

labelIndexer = StringIndexer(inputCol="fraudulent", outputCol="label")

assemblerInputs = [c + "Index" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)

stages += [labelIndexer,assembler, rf]

pipeline = Pipeline(stages=stages)
model = pipeline.fit(trainingData)

predictions = model.transform(testData)
predictions.select("label", "features").show(5)

evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName="areaUnderPR")
areaUnderPR = evaluator.evaluate(predictions)
print("areaUnderPR = %g" % (areaUnderPR))

In [None]:
(training, test) = new_data['title','fraudulent'].randomSplit([0.7, 0.3])

labelIndexer = StringIndexer(inputCol="fraudulent", outputCol="label")

tokenizer = Tokenizer(inputCol="title", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
pipeline = Pipeline(stages=[labelIndexer,tokenizer, hashingTF, rf])

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(rf.maxDepth, [2, 4,6]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)  


cvModel = crossval.fit(training)

prediction = cvModel.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName="areaUnderPR")
evaluator.evaluate(prediction)

In [None]:
(training, test) = new_data['full_description','fraudulent'].randomSplit([0.7, 0.3])


labelIndexer = StringIndexer(inputCol="fraudulent", outputCol="label")

tokenizer = Tokenizer(inputCol="full_description", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
rf = RandomForestClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[labelIndexer,tokenizer, hashingTF, idf,rf])

paramGrid = ParamGridBuilder() \
    .addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
    .addGrid(rf.maxDepth, [5, 10,15]) \
    .addGrid(rf.numTrees, [10, 100,1000]) \
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)


cvModel = crossval.fit(training)

prediction = cvModel.transform(test)
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName="areaUnderPR")
evaluator.evaluate(prediction)