In [1]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--jars /home/pklemenkov/hsu/lectures/kafka-clients-0.10.0.1.jar,/home/pklemenkov/hsu/lectures/spark-sql-kafka-0-10_2.11-2.3.0.jar pyspark-shell'
os.environ["PYSPARK_PYTHON"]='python3'
os.environ["SPARK_HOME"]='/opt/cloudera/parcels/SPARK2/lib/spark2/'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.6-src.zip'))
os.environ["PYSPARK_PYTHON"] = 'python3'
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera2
      /_/

Using Python version 3.4.3 (default, Nov 17 2016 01:08:31)
SparkSession available as 'spark'.


In [2]:
data = spark.read.parquet("/user/pklemenkov/lectures/lecture05/credit_fraud/")

In [3]:
data.printSchema()

root
 |-- time: integer (nullable = true)
 |-- amountRange: integer (nullable = true)
 |-- label: integer (nullable = true)
 |-- pcaVector: vector (nullable = true)



We'll be using 3 columns of this dataset:


- `pcaVector`: The PCA transformation of raw transaction data. For this example we'll assume that this PCA transformation occurs as part of some data pipeline before the data reaches us.
- `amountRange`: A value between 0 and 7. The approximate amount of a transaction. The values correspond to 0-1, 1-5, 5-10, 10-20, 20-50, 50-100, 100-200, and 200+ in dollars.
- `label`: 0 or 1. Indicates whether a transaction was fraudulent.

We want to build a model that will predict the label using the `pcaVector` and `amountRange` data. We'll do this by using a pipeline with 3 stages.


1. A `OneHotEncoderEstimator` to build a vector from the `amountRange` column. 
2. A `Vector` assembler to merge our `pcaVector` and `amountRange` vector into our `features` vector. 
3. A `GBTClassifier` to server as our `Estimator`.

Let's start by creating the objects that represent these stages.

In [4]:
from pyspark.ml.feature import OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.classification import GBTClassifier

In [5]:
one_hot = OneHotEncoderEstimator(inputCols=["amountRange"], outputCols=["amountVect"])

In [6]:
assembler = VectorAssembler(inputCols=["amountVect", "pcaVector"], outputCol="features")

In [7]:
estimator = GBTClassifier()

In [8]:
from pyspark.ml.feature import VectorSizeHint

In [9]:
hint = VectorSizeHint(inputCol="pcaVector", size=28)

In [10]:
train, test = data.randomSplit([0.8, 0.2])

In [11]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
  one_hot,
  hint,
  assembler,
  estimator
])

In [12]:
pipeline_model = pipeline.fit(train)

In [13]:
testDataPath = "/user/pklemenkov/credit-card-fraud-test-data"
test.repartition(20).write.mode("overwrite").parquet(testDataPath)

In [14]:
from pyspark.sql.types import *
from pyspark.ml.linalg import VectorUDT

In [15]:
schema = StructType(fields=[
  StructField("time", IntegerType()),
  StructField("amountRange", IntegerType()),
  StructField("label", IntegerType()),
  StructField("pcaVector", VectorUDT())
])

In [16]:
streamingData = spark.readStream\
                     .schema(schema)\
                     .option("maxFilesPerTrigger", 1)\
                     .parquet(testDataPath)

In [17]:
streamingRates = pipeline_model.transform(streamingData).groupBy("label").count()

In [18]:
streamingRates

DataFrame[label: int, count: bigint]

In [19]:
streaming_query = streamingRates.writeStream\
                                .format("memory")\
                                .outputMode("complete")\
                                .queryName("labels")\
                                .start()

In [23]:
import time
from IPython.display import clear_output

In [24]:
while True:
    clear_output()
    spark.sql("select * from labels").show()
    time.sleep(5)

+-----+-----+
|label|count|
+-----+-----+
|    1|   90|
|    0|57125|
+-----+-----+



KeyboardInterrupt: 

In [25]:
streaming_query.stop()