Spark is a big data solution that has been proven to be easier and faster than Hadoop MapReduce. The main difference between Spark and MapReduce is that Spark runs computations in memory during the later on the hard disk. It allows high-speed access and data processing, reducing times from hours to minutes.

스파크 역할
- computational engine, meaning it takes care of the scheduling, distributing and monitoring applications. Each task is done across various worker machines called computing cluster. A computer cluster refers to the division of tasks. In the end, all the tasks are aggregated to produce an output. The Spark admin gives a 360 overview of various Spark Jobs.

WHY use Spark?
- original process:
    - Load the data to the disk
    - Process / analyze the data
    - Build the machine learning model
    - Store the prediction back to disk
- The problem arises if the data scientist wants to process data that's too big for one computer.
    - 전체 데이터가 아니라 좋은 샘플링을 통해 모델을 만드는 작업을 해왔다.
        - 여기서 생기는 문제점
            - is the dataset reflecting the real world?
            - Does the data include a specific example?
            - Is the model fit for sampling?
- solution
    - split the problem up onto multiple computers.
        - Parallel computing comes with multiple problems as well.
    - Pyspark gives the data scientist an API that can be used to solve the parallel 

In [20]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("hello").getOrCreate()

In [66]:
df_origin = spark.read.csv('data/Food_Inspections.csv', inferSchema = True, header=True)
'''
from pyspark.sql import SQLContext
import pandas as pd
temp_csv = pd.read_csv('data/Food_Inspections.csv')
temp_pyspark = sqlContext.createDataFrame(temp_csv)
'''

"\nfrom pyspark.sql import SQLContext\nimport pandas as pd\ntemp_csv = pd.read_csv('data/Food_Inspections.csv')\ntemp_pyspark = sqlContext.createDataFrame(temp_csv)\n"

In [37]:
df_origin.printSchema()

root
 |-- Inspection ID: integer (nullable = true)
 |-- DBA Name: string (nullable = true)
 |-- AKA Name: string (nullable = true)
 |-- License #: integer (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Risk: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zip: integer (nullable = true)
 |-- Inspection Date: string (nullable = true)
 |-- Inspection Type: string (nullable = true)
 |-- Results: string (nullable = true)
 |-- Violations: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Location: string (nullable = true)



In [51]:
df = df_origin.select(['Inspection ID','DBA name','Results','Violations']).collect()

In [54]:
df

[Row(Inspection ID=2362957, DBA name='DONA TOLA', Results='Pass w/ Conditions', Violations='3. MANAGEMENT, FOOD EMPLOYEE AND CONDITIONAL EMPLOYEE; KNOWLEDGE, RESPONSIBILITIES AND REPORTING - Comments: 2-201.11-OBSERVED NO EMPLOYEE HEALTH POLICY/TRAINING ON SITE. INSTRUCTED FACILITY TO ESTABLISH AN APPROPRIATE EMPLOYEE HEALTH POLICY/TRAINING SYSTEM AND MAINTAIN WITH VERIFIABLE DOCUMENTS ON SITE. PRIORITY FOUNDATION VIOLATION 7-38-010. NO CITATION ISSUED   | 47. FOOD & NON-FOOD CONTACT SURFACES CLEANABLE, PROPERLY DESIGNED, CONSTRUCTED & USED - Comments: 4-101.11(B, C, D, E)-OBSERVED FOAM SEALANT USED ON WALL ON RIGHT WHEN ACCESSING BASEMENT. FOAM SEALANT IS NOT PERMITTED AND MUST BE REMOVED. RESTORE WALL TO A SMOOTH EASILY CLEANABLE CONDITION. | 55. PHYSICAL FACILITIES INSTALLED, MAINTAINED & CLEAN - Comments: 6-201.11-FLOORS WALLS AND CIELING OF BASEMENT MUST BE SMOOTH AND EAILY CLEANABLE IF ANY DRY FOOD STORAGE OR LIQUOR IS STORED IN BASEMENT. '),
 Row(Inspection ID=2362958, DBA name=

In [56]:
df_origin.columns

['Inspection ID',
 'DBA Name',
 'AKA Name',
 'License #',
 'Facility Type',
 'Risk',
 'Address',
 'City',
 'State',
 'Zip',
 'Inspection Date',
 'Inspection Type',
 'Results',
 'Violations',
 'Latitude',
 'Longitude',
 'Location']

In [57]:
df_origin.count()

201919

In [59]:
df_origin.describe(['Longitude']).show()

+-------+-------------------+
|summary|          Longitude|
+-------+-------------------+
|  count|             201193|
|   mean| -87.67649969463984|
| stddev|0.05887819924318163|
|    min| -87.91442843927047|
|    max|  -87.5250941359867|
+-------+-------------------+



In [62]:
df_origin.agg({"Longitude":"max"}).collect()

[Row(max(Longitude)=-87.5250941359867)]

In [65]:
from pyspark.sql import functions as F
df_origin.agg(F.min(df_origin.Longitude)).collect()

[Row(min(Longitude)=-87.91442843927047)]

cache()
- sql 구문으로 dataframe을 만들면 그 뒤에 한번 액션을 취할때마다 dataframe이 만들어지고 해당 액션이 끝나게 되면 메모리상에서 사라지게 된다. 따라서 자주 불리게 되는 dataframe같은 경우에는 cache라는 함수를 사용하여 메모리에 남겨두는 게 실행속도에 많은 도움이 된다.

In [70]:
from pyspark.ml.clustering import KMeans, BisectingKMeans
from pyspark.ml.classification import GBTClassifier

In [71]:
from pyspark.ml.feature import VectorAssembler

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

from pyspark.sql import SQLContext
from pyspark import SparkContext

sc = SparkContext()
# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = ptextFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

AttributeError: 'PipelinedRDD' object has no attribute 'toDF'

In [8]:
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

NameError: name 'training' is not defined

In [11]:
# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("diabeties").config("spark.some.config.option","some-value").getOrCreate()

In [12]:
raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"data/diabetes.csv")
raw_data.columns

['Pregnancies',
 'Glucose',
 'BloodPressure',
 'SkinThickness',
 'Insulin',
 'BMI',
 'DiabetesPedigreeFunction',
 'Age',
 'Outcome']

In [13]:
raw_data.describe().select("Summary","Pregnancies","Glucose","BloodPressure").show()

+-------+------------------+-----------------+------------------+
|Summary|       Pregnancies|          Glucose|     BloodPressure|
+-------+------------------+-----------------+------------------+
|  count|               768|              768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|
|    min|                 0|                0|                 0|
|    max|                17|              199|               122|
+-------+------------------+-----------------+------------------+



In [14]:
import numpy as np
from pyspark.sql.functions import when
raw_data=raw_data.withColumn("Glucose",when(raw_data.Glucose==0,np.nan).otherwise(raw_data.Glucose))
raw_data=raw_data.withColumn("BloodPressure",when(raw_data.BloodPressure==0,np.nan).otherwise(raw_data.BloodPressure))
raw_data=raw_data.withColumn("SkinThickness",when(raw_data.SkinThickness==0,np.nan).otherwise(raw_data.SkinThickness))
raw_data=raw_data.withColumn("BMI",when(raw_data.BMI==0,np.nan).otherwise(raw_data.BMI))
raw_data=raw_data.withColumn("Insulin",when(raw_data.Insulin==0,np.nan).otherwise(raw_data.Insulin))
raw_data.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5)

+-------+-------+-------------+-------------+----+
|Insulin|Glucose|BloodPressure|SkinThickness| BMI|
+-------+-------+-------------+-------------+----+
|    NaN|  148.0|         72.0|         35.0|33.6|
|    NaN|   85.0|         66.0|         29.0|26.6|
|    NaN|  183.0|         64.0|          NaN|23.3|
|   94.0|   89.0|         66.0|         23.0|28.1|
|  168.0|  137.0|         40.0|         35.0|43.1|
+-------+-------+-------------+-------------+----+
only showing top 5 rows



In [15]:
from pyspark.ml.feature import Imputer
imputer=Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|     SkinThickness|          Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+------------------+-----------------+----+------------------------+---+-------+
|          6|  148.0|         72.0|              35.0|155.5482233502538|33.6|                   0.627| 50|      1|
|          1|   85.0|         66.0|              29.0|155.5482233502538|26.6|                   0.351| 31|      0|
|          8|  183.0|         64.0|29.153419593345657|155.5482233502538|23.3|                   0.672| 32|      1|
|          1|   89.0|         66.0|              23.0|             94.0|28.1|                   0.167| 21|      0|
|          0|  137.0|         40.0|              35.0|            168.0|43.1|                   2.288| 33|      1|
+-----------+-------+-------------+------------------+-----------------+----+---

In [16]:
cols=raw_data.columns
cols.remove("Outcome")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

+-----------------------------------------------------------------------------------+
|features                                                                           |
+-----------------------------------------------------------------------------------+
|[6.0,148.0,72.0,35.0,155.5482233502538,33.6,0.627,50.0]                            |
|[1.0,85.0,66.0,29.0,155.5482233502538,26.6,0.351,31.0]                             |
|[8.0,183.0,64.0,29.153419593345657,155.5482233502538,23.3,0.672,32.0]              |
|[1.0,89.0,66.0,23.0,94.0,28.1,0.167,21.0]                                          |
|[0.0,137.0,40.0,35.0,168.0,43.1,2.288,33.0]                                        |
|[5.0,116.0,74.0,29.153419593345657,155.5482233502538,25.6,0.201,30.0]              |
|[3.0,78.0,50.0,32.0,88.0,31.0,0.248,26.0]                                          |
|[10.0,115.0,72.40518417462484,29.153419593345657,155.5482233502538,35.3,0.134,29.0]|
|[2.0,197.0,70.0,45.0,543.0,30.5,0.158,53.0]          

In [17]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|[6.0,148.0,72.0,3...|[1.78063837321943...|
|[1.0,85.0,66.0,29...|[0.29677306220323...|
|[8.0,183.0,64.0,2...|[2.37418449762590...|
|[1.0,89.0,66.0,23...|[0.29677306220323...|
|[0.0,137.0,40.0,3...|[0.0,4.5012560836...|
+--------------------+--------------------+
only showing top 5 rows



In [18]:
train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

In [19]:
dataset_size=float(train.select("Outcome").count())
numPositives=train.select("Outcome").where('Outcome == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 206
Percentage of ones are 34.27620632279534


In [20]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0.6572379367720466


In [21]:
train=train.withColumn("classWeights", when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

+-------------------+
|       classWeights|
+-------------------+
|0.34276206322795344|
|0.34276206322795344|
|0.34276206322795344|
|0.34276206322795344|
|0.34276206322795344|
+-------------------+
only showing top 5 rows



In [22]:
# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train=css.fit(train).transform(train)
test=css.fit(test).transform(test)
test.select("Aspect").show(5,truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------+
|Aspect                                                                                                                                 |
+---------------------------------------------------------------------------------------------------------------------------------------+
|[0.0,2.562758938133151,7.274924068899646,3.29885013976358,0.47047140468429116,5.367154589366346,1.3098778871870878,1.785679127775751]  |
|[0.0,2.8256060087109103,5.621532235058818,3.640110499049468,1.8295247783934943,5.207158111092553,0.7183201316832416,2.125808485447323] |
|[0.0,2.9898854278220095,5.621532235058818,3.640110499049468,2.4699748745925287,5.8035086210221465,1.1499158410559458,2.125808485447323]|
|[0.0,3.1213089631108892,5.290853868290652,4.436384670716539,1.2349874372962644,6.487129937282901,1.1046435638490186,1.870711467193644] |
|[0.0,3.1213089631108892,6.6135673

In [23]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Outcome","prediction").show(10)

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       1.0|
+-------+----------+
only showing top 10 rows



In [25]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="Outcome")
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

+-------+--------------------+----------+--------------------+
|Outcome|       rawPrediction|prediction|         probability|
+-------+--------------------+----------+--------------------+
|      0|[2.58888023648551...|       0.0|[0.93014249279728...|
|      0|[2.32098145085482...|       0.0|[0.91059987057264...|
|      0|[1.68081620680194...|       0.0|[0.84301258010142...|
|      0|[0.64946166218389...|       0.0|[0.65688913922505...|
|      0|[1.78997774283908...|       0.0|[0.85692454770533...|
+-------+--------------------+----------+--------------------+
only showing top 5 rows

The area under ROC for train set is 0.838687476957108
The area under ROC for test set is 0.8447004608294926


In [26]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

In [27]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

KeyboardInterrupt: 

Spark