# Pyspark Kurulum, init

In [1]:
import pandas as pd
import findspark
findspark.init('spark/spark-2.4.6-bin-hadoop2.7')

import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml import Pipeline

In [2]:
spark = SparkSession.builder \
            .appName("LightGBMApp2") \
            .master("local[*]") \
            .config("spark.jars.packages", "com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1") \
            .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
            .config("spark.jars.excludes","org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.11,org.scalactic:scalactic_2.11,org.scalatest:scalatest_2.11") \
            .config("spark.driver.memory", "14g") \
            .config("spark.memory.offHeap.enabled",True) \
            .config("spark.memory.offHeap.size","16g") \
            .getOrCreate()
sc = spark.sparkContext
sc

In [9]:
sc.stop()

# CODES

# Load csv Data

In [3]:
# File location and type
#download https://www.kaggle.com/mlg-ulb/creditcardfraud
file_location = "creditcard.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)
pd.DataFrame(df.take(3), columns=df.columns)

Unnamed: 0,Time,V1,V2,V3,V4,V5,V6,V7,V8,V9,...,V21,V22,V23,V24,V25,V26,V27,V28,Amount,Class
0,0,-1.359807,-0.072781,2.536347,1.378155,-0.338321,0.462388,0.239599,0.098698,0.363787,...,-0.018307,0.277838,-0.110474,0.066928,0.128539,-0.189115,0.133558,-0.021053,149.62,0
1,0,1.191857,0.266151,0.16648,0.448154,0.060018,-0.082361,-0.078803,0.085102,-0.255425,...,-0.225775,-0.638672,0.101288,-0.339846,0.16717,0.125895,-0.008983,0.014724,2.69,0
2,1,-1.358354,-1.340163,1.773209,0.37978,-0.503198,1.800499,0.791461,0.247676,-1.514654,...,0.247998,0.771679,0.909412,-0.689281,-0.327642,-0.139097,-0.055353,-0.059752,378.66,0


In [4]:
feature_cols = ["V" + str(i) for i in range(1,29)] + ["Amount"]
from pyspark.ml.feature import StringIndexer
va = VectorAssembler(inputCols = feature_cols, outputCol = "features")
va_df = va.transform(df)
final_df = va_df.select(["features","Class"])
train_df, test_df = final_df.randomSplit([0.8, 0.2], seed=7)
train_df.count()

227900

# Training LightGBM on Spark

In [5]:
import mmlspark
from mmlspark.lightgbm import LightGBMClassifier

In [6]:
lightgbm_model = LightGBMClassifier( featuresCol = "features", labelCol = "Class",
                                    earlyStoppingRound=100,objective="binary")
model = lightgbm_model.fit(train_df)
y_pred = model.transform(test_df)
ac = y_pred.select("Class","prediction")
ac.filter(ac.Class == ac.prediction).count() / ac.count()

0.9980670216317852