# Setup Jupyter Notebook for Apache Spark

In [41]:
from pyspark.sql import Row
a = Row(name='Antony', age=27, height=168)
print("a: ", a)

a:  Row(name='Antony', age=27, height=168)
a:  Row(name='Antony', age=27, height=168)


In [42]:
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate()
sc

In [43]:
df = sc.parallelize([ \
    Row(nama='Adrian', umur=21, tinggi=165), \
    Row(nama='Dwi', umur=21, tinggi=165), \
    Row(nama='Adinata', umur=21, tinggi=165)]).toDF()
df.show()

+-------+----+------+
|   nama|umur|tinggi|
+-------+----+------+
| Adrian|  21|   165|
|    Dwi|  21|   165|
|Adinata|  21|   165|
+-------+----+------+

+-------+----+------+
|   nama|umur|tinggi|
+-------+----+------+
| Adrian|  21|   165|
|    Dwi|  21|   165|
|Adinata|  21|   165|
+-------+----+------+



# Introduction to DataFrame dan SQL Spark

## Membaca Data File ke DataFrame

In [44]:
#mengimpor modul yang dibutuhkan
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#buat sesi agar dapat mengakses semua API Spark
spark = SparkSession \
    .builder \
    .appName("Introdution to Spark DataFrame") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

#tentukan skema data untuk file yang ingin kita baca
purchaseSchema = StructType([
    StructField("Date", DateType(), True),
    StructField("Time", StringType(), True),
    StructField("City", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Total", FloatType(), True),
    StructField("Payment", StringType(), True),
])    

#baca file csv dengan skema yang kita tentukan ke dalam Spark DataFrame, dan gunakan pembatas "tab"
purchaseDataframe = spark.read.csv(
    "dataset/purchases.csv", 
    header=True, schema=purchaseSchema, sep="\t")
#tampilkan 3 baris DataFrame
purchaseDataframe.show(3)

+----------+-----+----------+----------------+------+-------+
|      Date| Time|      City|            Item| Total|Payment|
+----------+-----+----------+----------------+------+-------+
|2012-01-01|09:00|  San Jose|  Men's Clothing|214.05|   Amex|
|2012-01-01|09:00|Fort Worth|Women's Clothing|153.57|   Visa|
|2012-01-01|09:00| San Diego|           Music| 66.08|   Cash|
+----------+-----+----------+----------------+------+-------+
only showing top 3 rows

+----------+-----+----------+----------------+------+-------+
|      Date| Time|      City|            Item| Total|Payment|
+----------+-----+----------+----------------+------+-------+
|2012-01-01|09:00|  San Jose|  Men's Clothing|214.05|   Amex|
|2012-01-01|09:00|Fort Worth|Women's Clothing|153.57|   Visa|
|2012-01-01|09:00| San Diego|           Music| 66.08|   Cash|
+----------+-----+----------+----------------+------+-------+
only showing top 3 rows



## Menghitung jumlah baris, mencetak skema kerangka data, dan mencetak statistik data

In [45]:
#hitung jumlah baris dataFrame
num_rows = purchaseDataframe.count()
print("number of rows: ", num_rows)
#tampilkan skema dataFrame
purchaseDataframe.printSchema()
#tampilkan statistik dari data yang kita inginkan
purchaseDataframe.describe('Total').show()

number of rows:  4138476
root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Total: float (nullable = true)
 |-- Payment: string (nullable = true)

number of rows:  4138476
root
 |-- Date: date (nullable = true)
 |-- Time: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Item: string (nullable = true)
 |-- Total: float (nullable = true)
 |-- Payment: string (nullable = true)

+-------+------------------+
|summary|             Total|
+-------+------------------+
|  count|           4138476|
|   mean|249.96108549398525|
| stddev| 144.3174111542959|
|    min|               0.0|
|    max|            499.99|
+-------+------------------+

+-------+------------------+
|summary|             Total|
+-------+------------------+
|  count|           4138476|
|   mean|249.96108549398525|
| stddev| 144.3174111542959|
|    min|               0.0|
|    max|            499.99|
+-------+-

## Membuat dataFrame baru dari subset dataFrame yang ada

In [46]:
#buat dataFrame baru dari kolom "Kota" dan "Total"
newDataframe = purchaseDataframe.select(purchaseDataframe['City'], 
                                              purchaseDataframe['Total'])
newDataframe.show(3); #menampilkan 3 baris DataFrame baru
newDataframe.printSchema() #print skema dari DataFrame baru

+----------+------+
|      City| Total|
+----------+------+
|  San Jose|214.05|
|Fort Worth|153.57|
| San Diego| 66.08|
+----------+------+
only showing top 3 rows

root
 |-- City: string (nullable = true)
 |-- Total: float (nullable = true)

+----------+------+
|      City| Total|
+----------+------+
|  San Jose|214.05|
|Fort Worth|153.57|
| San Diego| 66.08|
+----------+------+
only showing top 3 rows

root
 |-- City: string (nullable = true)
 |-- Total: float (nullable = true)



## Menambahkan nilai konstan ke setiap data baris di kolom tertentu

In [47]:
#menambahkan nilai konstan 10 untuk setiap data baris di kolom "Total"
purchaseDataframe.select(purchaseDataframe['City'],
                         purchaseDataframe['Total']+10).show(3)

+----------+------------+
|      City|(Total + 10)|
+----------+------------+
|  San Jose|      224.05|
|Fort Worth|      163.57|
| San Diego|       76.08|
+----------+------------+
only showing top 3 rows

+----------+------------+
|      City|(Total + 10)|
+----------+------------+
|  San Jose|      224.05|
|Fort Worth|      163.57|
| San Diego|       76.08|
+----------+------------+
only showing top 3 rows



## Memfilter dataFrame menggunakan kondisi yang ditentukan

In [48]:
#filter hanya data baris yang nilai kolom "Total" > 200
purchaseDataframe.filter(purchaseDataframe['Total'] > 200).show(3)

+----------+-----+----------+-------------------+------+----------+
|      Date| Time|      City|               Item| Total|   Payment|
+----------+-----+----------+-------------------+------+----------+
|2012-01-01|09:00|  San Jose|     Men's Clothing|214.05|      Amex|
|2012-01-01|09:00|Pittsburgh|       Pet Supplies|493.51|  Discover|
|2012-01-01|09:00|     Omaha|Children's Clothing|235.63|MasterCard|
+----------+-----+----------+-------------------+------+----------+
only showing top 3 rows

+----------+-----+----------+-------------------+------+----------+
|      Date| Time|      City|               Item| Total|   Payment|
+----------+-----+----------+-------------------+------+----------+
|2012-01-01|09:00|  San Jose|     Men's Clothing|214.05|      Amex|
|2012-01-01|09:00|Pittsburgh|       Pet Supplies|493.51|  Discover|
|2012-01-01|09:00|     Omaha|Children's Clothing|235.63|MasterCard|
+----------+-----+----------+-------------------+------+----------+
only showing top 3 rows

## Menyortir dataFrame berdasarkan kolom tertentu

In [49]:
sortedByCity = purchaseDataframe.orderBy('City').show(4)

+----------+-----+-----------+----------------+------+----------+
|      Date| Time|       City|            Item| Total|   Payment|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:40|Albuquerque|            Toys|299.63|MasterCard|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.49|  Discover|
|2012-10-07|10:39|Albuquerque|    Pet Supplies| 401.3|MasterCard|
+----------+-----+-----------+----------------+------+----------+
only showing top 4 rows

+----------+-----+-----------+----------------+------+----------+
|      Date| Time|       City|            Item| Total|   Payment|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:40|Albuquerque|            Toys|299.63|MasterCard|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.49|  Discover|
|2012-10-07|10:39|Albuquerque|    Pet Supplies| 401

## Menghitung jumlah transaksi di setiap kota

In [50]:
numTransactionEachCity = purchaseDataframe.groupBy("City").count()
numTransactionEachCity.show(5)

+---------------+-----+
|           City|count|
+---------------+-----+
|North Las Vegas|40013|
|        Phoenix|40333|
|          Omaha|40209|
|      Anchorage|39806|
|        Anaheim|40086|
+---------------+-----+
only showing top 5 rows

+---------------+-----+
|           City|count|
+---------------+-----+
|North Las Vegas|40013|
|        Phoenix|40333|
|          Omaha|40209|
|      Anchorage|39806|
|        Anaheim|40086|
+---------------+-----+
only showing top 5 rows



## Mengindeks dan Mengakses DataFrame

In [51]:
#mengimpor monotonically_increasing_id
from pyspark.sql.functions import monotonically_increasing_id

newPurchasedDataframe = purchaseDataframe.withColumn(
    "index", monotonically_increasing_id())
newPurchasedDataframe.show(7)
row2Till4 = newPurchasedDataframe.filter((newPurchasedDataframe['index']>=2) &
                                         (newPurchasedDataframe['index']<=4))
row2Till4.show()

+----------+-----+----------+-------------------+------+----------+-----+
|      Date| Time|      City|               Item| Total|   Payment|index|
+----------+-----+----------+-------------------+------+----------+-----+
|2012-01-01|09:00|  San Jose|     Men's Clothing|214.05|      Amex|    0|
|2012-01-01|09:00|Fort Worth|   Women's Clothing|153.57|      Visa|    1|
|2012-01-01|09:00| San Diego|              Music| 66.08|      Cash|    2|
|2012-01-01|09:00|Pittsburgh|       Pet Supplies|493.51|  Discover|    3|
|2012-01-01|09:00|     Omaha|Children's Clothing|235.63|MasterCard|    4|
|2012-01-01|09:00|  Stockton|     Men's Clothing|247.18|MasterCard|    5|
|2012-01-01|09:00|    Austin|            Cameras| 379.6|      Visa|    6|
+----------+-----+----------+-------------------+------+----------+-----+
only showing top 7 rows

+----------+-----+----------+-------------------+------+----------+-----+
|      Date| Time|      City|               Item| Total|   Payment|index|
+----------+-

In [52]:
dataRow2ColumnTotal = newPurchasedDataframe.filter(newPurchasedDataframe['index']==2).select('Total')
dataRow2ColumnTotal.show()

+-----+
|Total|
+-----+
|66.08|
+-----+

+-----+
|Total|
+-----+
|66.08|
+-----+



## Menggunakan kueri SQL di dataFrame

In [53]:
#kita perlu membuat tampilan sementara sql untuk dataFrame kita
purchaseDataframe.createOrReplaceTempView("purchaseSql")

#pilih kolom "Total" dan "Payment" dari tampilan sementara sql
anotherNewDataframe = spark.sql("SELECT Total, Payment FROM purchaseSql")
anotherNewDataframe.show(3)

+------+-------+
| Total|Payment|
+------+-------+
|214.05|   Amex|
|153.57|   Visa|
| 66.08|   Cash|
+------+-------+
only showing top 3 rows

+------+-------+
| Total|Payment|
+------+-------+
|214.05|   Amex|
|153.57|   Visa|
| 66.08|   Cash|
+------+-------+
only showing top 3 rows



In [54]:
#sorting data berdasarkan kolom "City" menurut abjad
orderByCity = spark.sql("SELECT * FROM purchaseSql ORDER BY City")
orderByCity.show(5)

+----------+-----+-----------+----------------+------+----------+
|      Date| Time|       City|            Item| Total|   Payment|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:41|Albuquerque|           Music|365.64|      Visa|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.49|  Discover|
|2012-10-07|10:39|Albuquerque|    Pet Supplies| 401.3|MasterCard|
|2012-10-07|11:18|Albuquerque|          Crafts|475.77|      Visa|
+----------+-----+-----------+----------------+------+----------+
only showing top 5 rows

+----------+-----+-----------+----------------+------+----------+
|      Date| Time|       City|            Item| Total|   Payment|
+----------+-----+-----------+----------------+------+----------+
|2012-10-07|11:11|Albuquerque|    Pet Supplies| 308.7|      Visa|
|2012-10-07|11:41|Albuquerque|           Music|365.64|      Visa|
|2012-10-07|11:13|Albuquerque|Women's Clothing|419.

In [55]:
#filter nilai kolom Total>50 dan urutkan berdasarkan cara pembayaran
filterAndSortWithSQL = spark.sql("SELECT * FROM purchaseSql WHERE Total>200 ORDER BY Payment")
filterAndSortWithSQL.show(4)

+----------+-----+--------------+-------------------+------+-------+
|      Date| Time|          City|               Item| Total|Payment|
+----------+-----+--------------+-------------------+------+-------+
|2012-10-07|10:34|      Richmond|Children's Clothing|252.45|   Amex|
|2012-10-07|10:36|San Bernardino|               Toys|272.91|   Amex|
|2012-10-07|10:34|     Baltimore|              Books|299.94|   Amex|
|2012-10-07|10:33|       Lincoln|       Pet Supplies|359.44|   Amex|
+----------+-----+--------------+-------------------+------+-------+
only showing top 4 rows

+----------+-----+--------------+-------------------+------+-------+
|      Date| Time|          City|               Item| Total|Payment|
+----------+-----+--------------+-------------------+------+-------+
|2012-10-07|10:34|      Richmond|Children's Clothing|252.45|   Amex|
|2012-10-07|10:36|San Bernardino|               Toys|272.91|   Amex|
|2012-10-07|10:34|     Baltimore|              Books|299.94|   Amex|
|2012-10-

# Data Preprocessing in Spark

## Baca file data dan buat skema data

In [56]:
from pyspark.sql.types import *

#membuat session
appName = "data preprocessing in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

flights = spark.read.csv('dataset/raw-flight-data.csv', 
                         schema=flightSchema, header=True)
flights.show(2)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 2 rows



In [57]:
airportSchema = StructType([
  StructField("airport_id", IntegerType(), False),
  StructField("city", StringType(), False),
  StructField("state", StringType(), False),
  StructField("name", StringType(), False),
])

airports = spark.read.csv('dataset/airports.csv', header=True, 
                          schema=airportSchema)
airports.show(2)

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows

+----------+-----------+-----+--------------------+
|airport_id|       city|state|                name|
+----------+-----------+-----+--------------------+
|     10165|Adak Island|   AK|                Adak|
|     10299|  Anchorage|   AK|Ted Stevens Ancho...|
+----------+-----------+-----+--------------------+
only showing top 2 rows



Gabungkan dua dataFrame (penerbangan dan bandara), dan tunjukkan berapa banyak penerbangan dari setiap Kota

In [58]:
flightsByOrigin = flights.join(airports,
                               flights.OriginAirportID == 
                               airports.airport_id).groupBy("city").count()
flightsByOrigin.show(3)

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|90281|
|         Omaha|13537|
|Raleigh/Durham|28436|
+--------------+-----+
only showing top 3 rows

+--------------+-----+
|          city|count|
+--------------+-----+
|       Phoenix|90281|
|         Omaha|13537|
|Raleigh/Durham|28436|
+--------------+-----+
only showing top 3 rows



## Menangani duplikasi data

Hapus duplikasi data dan hitung berapa banyak data diduplikasi

In [59]:
#hitung jumlah baris data asli
n1 = flights.count()
print("number of original data rows: ", n1)
#hitung jumlah baris data setelah menghapus data yang diduplikasi
n2 = flights.dropDuplicates().count()
print("number of data rows after deleting duplicated data: ", n2)
n3 = n1 - n2
print("number of duplicated data: ", n3)

number of original data rows:  2719418
number of original data rows:  2719418
number of data rows after deleting duplicated data:  2696983
number of duplicated data:  22435
number of data rows after deleting duplicated data:  2696983
number of duplicated data:  22435


Kita juga dapat menentukan kriteria duplikat dengan kolom tertentu

In [60]:
df = spark.createDataFrame([("Rony",27, 168), 
                            ("Rony",15, 165), 
                            ("Rony",27, 168)], 
                           ["name","age","height"])
df.show()
df.dropDuplicates().show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
|Rony| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
|Rony| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
|Rony| 15|   165|
+----+---+------+



In [61]:
df.dropDuplicates(['name']).show()

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
+----+---+------+

+----+---+------+
|name|age|height|
+----+---+------+
|Rony| 27|   168|
+----+---+------+



## Menangani data yang hilang

Hapus baris jika setidaknya ada satu (kolom) data yang hilang

In [62]:
flightsNoMissingValue = flights.dropDuplicates().dropna(
    how="any", subset=["ArrDelay", "DepDelay"])# gunakan how="all" untuk semua kolom data yang hilang
numberOfMissingValueAny = n1 - flightsNoMissingValue.count()
print("number of missing value rows: ", numberOfMissingValueAny)

number of missing value rows:  46233
number of missing value rows:  46233


Isi data yang hilang menggunakan nilai rata-rata dari setiap data kolom yang sesuai

In [63]:
#ambil nilai rata-rata
meanArrDelay = flights.groupBy().avg("ArrDelay").take(1)[0][0]
print("mean ArrDelay: ", meanArrDelay)
meanDepDelay = flights.groupBy().avg("DepDelay").take(1)[0][0]
print("mean DepDelay: ", meanDepDelay)
#hapus data yang digandakan dan isi data yang hilang dengan nilai rata-rata
flightsCleanData=flights.fillna(
    {'ArrDelay': meanArrDelay, 'DepDelay': meanDepDelay})
#hanya untuk percobaan
flights.groupBy().avg("ArrDelay").show()

mean ArrDelay:  6.63768791455498
mean ArrDelay:  6.63768791455498
mean DepDelay:  10.53686662649788
mean DepDelay:  10.53686662649788
+----------------+
|   avg(ArrDelay)|
+----------------+
|6.63768791455498|
+----------------+

+----------------+
|   avg(ArrDelay)|
+----------------+
|6.63768791455498|
+----------------+



## Jelajahi statistik data

In [64]:
flightsCleanData.describe('DepDelay','ArrDelay').show()

+-------+------------------+-----------------+
|summary|          DepDelay|         ArrDelay|
+-------+------------------+-----------------+
|  count|           2719418|          2719418|
|   mean|10.531448640848888|6.630879842672218|
| stddev| 35.91695039008075|38.44200618946938|
|    min|               -63|              -94|
|    max|              1863|             1845|
+-------+------------------+-----------------+

+-------+------------------+-----------------+
|summary|          DepDelay|         ArrDelay|
+-------+------------------+-----------------+
|  count|           2719418|          2719418|
|   mean|10.531448640848888|6.630879842672218|
| stddev| 35.91695039008075|38.44200618946938|
|    min|               -63|              -94|
|    max|              1863|             1845|
+-------+------------------+-----------------+



Kita juga dapat menghitung korelasi antara dua variabel untuk mengetahui apakah variabel tersebut berhubungan satu sama lain atau tidak

In [65]:
correlation = flightsCleanData.corr('DepDelay', 'ArrDelay')
print("correlation between departure delay and arrival delay: ", 
      correlation)

correlation between departure delay and arrival delay:  0.939353821557276
correlation between departure delay and arrival delay:  0.939353821557276


# Regression in Spark

## Meningimpor modul dan membuat sesi spark

In [66]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

#create Spark session
appName = "Regression in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Membaca data file kedalam dataframe spark

In [67]:
#define our schema
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])
#read csv data with our defined schema
flightDataFrame = spark.read.csv('dataset/flights.csv', schema=flightSchema, header=True)
flightDataFrame.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     

## Memilih data atau fitur penting untuk regresi

In [68]:
#select related column data for our regression input features
data = flightDataFrame.select("DayofMonth", "DayOfWeek", 
                              "OriginAirportID", "DestAirportID", 
                              "DepDelay", "ArrDelay")
data.show(3)

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|        19|        5|          11433|        13303|      -3|       1|
|        19|        5|          14869|        12478|       0|      -8|
|        19|        5|          14057|        14869|      -4|     -15|
+----------+---------+---------------+-------------+--------+--------+
only showing top 3 rows

+----------+---------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+---------------+-------------+--------+--------+
|        19|        5|          11433|        13303|      -3|       1|
|        19|        5|          14869|        12478|       0|      -8|
|        19|        5|          14057|        14869|      -4|     -15|
+----------+---------+---------------+-------------+

## Membagi data kedalam data latih dan data test

In [69]:
#membagi data, 70% training, 30% testing
dividedData = data.randomSplit([0.7, 0.3])
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print("Training data rows: ", train_rows, "; Testing data rows:", test_rows)

Training data rows:  1891330 ; Testing data rows: 810888
Training data rows:  1891330 ; Testing data rows: 810888


## Mempersiapkan data latih

In [70]:
#define an assembler
assembler = VectorAssembler(inputCols=[
    "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", 
    "DepDelay"], outputCol="features")

#change our features into one column using our defined assembler
trainingDataFinal = assembler.transform(trainingData).select(
    col("features"), (col("ArrDelay").cast("Int").alias("label")))
trainingDataFinal.show(truncate=False , n=3)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0]|-11  |
|[1.0,1.0,10140.0,10397.0,-2.0]|-17  |
|[1.0,1.0,10140.0,10821.0,8.0] |-9   |
+------------------------------+-----+
only showing top 3 rows

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10397.0,-4.0]|-11  |
|[1.0,1.0,10140.0,10397.0,-2.0]|-17  |
|[1.0,1.0,10140.0,10821.0,8.0] |-9   |
+------------------------------+-----+
only showing top 3 rows



## Melatih model regresi menggunakan data latih

In [71]:
#call Spark linear regression we import before
algoritma = LinearRegression(
    labelCol="label",featuresCol="features", 
    maxIter=10, regParam=0.3)
#train the model
model = algoritma.fit(trainingDataFinal)
print ("Regression model is trained!")

Regression model is trained!
Regression model is trained!


# Mempersiapkan data testing

In [72]:
#change our feature data into one column using our defined assembler
#just like what we did before in the training data
testingDataFinal = assembler.transform(
    testingData).select(
    col("features"), (col("ArrDelay")).cast("Int").alias("trueLabel"))
testingDataFinal.show(truncate=False, n=2)

+------------------------------+---------+
|features                      |trueLabel|
+------------------------------+---------+
|[1.0,1.0,10140.0,11259.0,-2.0]|-14      |
|[1.0,1.0,10140.0,11259.0,-1.0]|-11      |
+------------------------------+---------+
only showing top 2 rows

+------------------------------+---------+
|features                      |trueLabel|
+------------------------------+---------+
|[1.0,1.0,10140.0,11259.0,-2.0]|-14      |
|[1.0,1.0,10140.0,11259.0,-1.0]|-11      |
+------------------------------+---------+
only showing top 2 rows



## Memprediksi data testing menggunakan model yang telah dilatih

In [73]:
#predict testing data using our model
prediction = model.transform(testingDataFinal)
#show some prediction results
prediction.show(3)

+--------------------+---------+-------------------+
|            features|trueLabel|         prediction|
+--------------------+---------+-------------------+
|[1.0,1.0,10140.0,...|      -14| -5.758745937056345|
|[1.0,1.0,10140.0,...|      -11| -4.761671320056825|
|[1.0,1.0,10140.0,...|      -12|-3.7645967030573058|
+--------------------+---------+-------------------+
only showing top 3 rows

+--------------------+---------+-------------------+
|            features|trueLabel|         prediction|
+--------------------+---------+-------------------+
|[1.0,1.0,10140.0,...|      -14| -5.758745937056345|
|[1.0,1.0,10140.0,...|      -11| -4.761671320056825|
|[1.0,1.0,10140.0,...|      -12|-3.7645967030573058|
+--------------------+---------+-------------------+
only showing top 3 rows



## Menghitung performa model

In [74]:
#import evaluator module for regression
from pyspark.ml.evaluation import RegressionEvaluator

#define our evaluator
evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
#calculate RMSE of our trained model
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 13.183545001214409
Root Mean Square Error (RMSE): 13.183545001214409


# Classification in Spark

## Mengimpor modul dan membuat sesi spark

In [75]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

#create Spark session
appName = "Classification in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Membaca data file kedalam dataframe spark

In [76]:
#define our schema
flightSchema = StructType([
  StructField("DayofMonth", IntegerType(), False),
  StructField("DayOfWeek", IntegerType(), False),
  StructField("Carrier", StringType(), False),
  StructField("OriginAirportID", IntegerType(), False),
  StructField("DestAirportID", IntegerType(), False),
  StructField("DepDelay", IntegerType(), False),
  StructField("ArrDelay", IntegerType(), False),
])

#read csv data with our defined schema
csv = spark.read.csv('dataset/flights.csv', schema=flightSchema, header=True)
csv.show(3)

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     DL|          14057|        14869|      -4|     -15|
+----------+---------+-------+---------------+-------------+--------+--------+
only showing top 3 rows

+----------+---------+-------+---------------+-------------+--------+--------+
|DayofMonth|DayOfWeek|Carrier|OriginAirportID|DestAirportID|DepDelay|ArrDelay|
+----------+---------+-------+---------------+-------------+--------+--------+
|        19|        5|     DL|          11433|        13303|      -3|       1|
|        19|        5|     DL|          14869|        12478|       0|      -8|
|        19|        5|     

## Memilih data penting atau fitur data untuk klasifikasi dan mengubah fitur arrival delay ke binary class "late" vs "not late"

In [77]:
data = csv.select(
    "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", 
    "DepDelay", ((col("ArrDelay") > 15).cast("Int").alias("Late")))
data.show(3)

+----------+---------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|        19|        5|          11433|        13303|      -3|   0|
|        19|        5|          14869|        12478|       0|   0|
|        19|        5|          14057|        14869|      -4|   0|
+----------+---------+---------------+-------------+--------+----+
only showing top 3 rows

+----------+---------+---------------+-------------+--------+----+
|DayofMonth|DayOfWeek|OriginAirportID|DestAirportID|DepDelay|Late|
+----------+---------+---------------+-------------+--------+----+
|        19|        5|          11433|        13303|      -3|   0|
|        19|        5|          14869|        12478|       0|   0|
|        19|        5|          14057|        14869|      -4|   0|
+----------+---------+---------------+-------------+--------+----+
only showing top 3 rows



## Membagi data kedalam data latih dan data test

In [78]:
#membagi data, 70% training, 30% testing
dividedData = data.randomSplit([0.7, 0.3])
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print("Training data rows: ", train_rows, "; Testing data rows:", test_rows)

Training data rows:  1891093 ; Testing data rows: 811125
Training data rows:  1891093 ; Testing data rows: 811125


## Menyipkan data latih

In [79]:
#define an assembler
assembler = VectorAssembler(inputCols = [
    "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", 
    "DepDelay"], outputCol="features")
trainingDataFinal = assembler.transform(
    trainingData).select(col("features"), col("Late").alias("label"))
trainingDataFinal.show(truncate=False, n=2)

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10821.0,8.0] |0    |
|[1.0,1.0,10140.0,11259.0,-3.0]|0    |
+------------------------------+-----+
only showing top 2 rows

+------------------------------+-----+
|features                      |label|
+------------------------------+-----+
|[1.0,1.0,10140.0,10821.0,8.0] |0    |
|[1.0,1.0,10140.0,11259.0,-3.0]|0    |
+------------------------------+-----+
only showing top 2 rows



## Melatih model klasifikasi menggunakan data latih

In [83]:
#define our classifier
classifier = LogisticRegression(
    labelCol="label",featuresCol="features",maxIter=10,regParam=0.3)
#train our classifier
model = classifier.fit(trainingDataFinal)
print ("Classifier model is trained!")

Classifier model is trained!
Classifier model is trained!


## Mempersiapkan data testing

In [84]:
testingDataFinal = assembler.transform(
    testingData).select(col("features"), col("Late").alias("trueLabel"))
testingDataFinal.show(3)

+--------------------+---------+
|            features|trueLabel|
+--------------------+---------+
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
+--------------------+---------+
only showing top 3 rows

+--------------------+---------+
|            features|trueLabel|
+--------------------+---------+
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
|[1.0,1.0,10140.0,...|        0|
+--------------------+---------+
only showing top 3 rows



## Memprediksi data testing menggunakan model yang telah dilatih

In [85]:
prediction = model.transform(testingDataFinal)
predictionFinal = prediction.select(
    "features", "prediction", "probability", "trueLabel")
predictionFinal.show(truncate=False, n=3)
prediction.show(truncate=False, n=3)

+------------------------------+----------+----------------------------------------+---------+
|features                      |prediction|probability                             |trueLabel|
+------------------------------+----------+----------------------------------------+---------+
|[1.0,1.0,10140.0,10397.0,-4.0]|0.0       |[0.83140579153588,0.16859420846412]     |0        |
|[1.0,1.0,10140.0,10397.0,-2.0]|0.0       |[0.8274504924386975,0.17254950756130247]|0        |
|[1.0,1.0,10140.0,11259.0,-2.0]|0.0       |[0.8278173299459717,0.1721826700540283] |0        |
+------------------------------+----------+----------------------------------------+---------+
only showing top 3 rows

+------------------------------+----------+----------------------------------------+---------+
|features                      |prediction|probability                             |trueLabel|
+------------------------------+----------+----------------------------------------+---------+
|[1.0,1.0,10140.0,10397.0

## Menghitung performa model

In [86]:
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['trueLabel']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

correct prediction: 668288 , total data: 811125 , accuracy: 0.8239026044074588
correct prediction: 668288 , total data: 811125 , accuracy: 0.8239026044074588


## Mencoba algoritma klasifikasi lain yang disediakan Spark (Random Forest)

In [87]:
from pyspark.ml.classification import RandomForestClassifier

model2 = RandomForestClassifier(
    numTrees=3, maxDepth=5, seed=42, labelCol="label",featuresCol="features")
model2 = model2.fit(trainingDataFinal)
print ("Model is trained!")

Model is trained!
Model is trained!


In [88]:
prediction = model2.transform(testingDataFinal)
predictionFinal = prediction.select(
    "features", "prediction", "probability", "trueLabel")
predictionFinal.show(truncate=False, n=3)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['trueLabel']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", 
      totalData, ", accuracy", correctPrediction/totalData)

+------------------------------+----------+----------------------------------------+---------+
|features                      |prediction|probability                             |trueLabel|
+------------------------------+----------+----------------------------------------+---------+
|[1.0,1.0,10140.0,10397.0,-4.0]|0.0       |[0.9300913633017692,0.06990863669823082]|0        |
|[1.0,1.0,10140.0,10397.0,-2.0]|0.0       |[0.9300913633017692,0.06990863669823082]|0        |
|[1.0,1.0,10140.0,11259.0,-2.0]|0.0       |[0.9300913633017692,0.06990863669823082]|0        |
+------------------------------+----------+----------------------------------------+---------+
only showing top 3 rows

+------------------------------+----------+----------------------------------------+---------+
|features                      |prediction|probability                             |trueLabel|
+------------------------------+----------+----------------------------------------+---------+
|[1.0,1.0,10140.0,10397.0

# Sentiment Analysis In Spark

## Mengimpor modul dan membuat sesi spark

In [89]:
#import modules
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

#create Spark session
appName = "Sentiment Analysis in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Membaca data file kedalam dataframe spark

In [90]:
#read csv file into dataFrame with automatically inferred schema
tweets_csv = spark.read.csv('dataset/tweets.csv', inferSchema=True, header=True)
tweets_csv.show(truncate=False, n=3)

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only showing top 3 rows

+------+---------+---------------+---------------------------------+
|ItemID|Sentiment|SentimentSource|SentimentText                    |
+------+---------+---------------+---------------------------------+
|1038  |1        |Sentiment140   |that film is fantastic #brilliant|
|1804  |1        |Sentiment140   |this music is really bad #myband |
|1693  |0        |Sentiment140   |winter is terrible #thumbs-down  |
+------+---------+---------------+---------------------------------+
only show

## Memilih data yang berhubungan

In [91]:
#select only "SentimentText" and "Sentiment" column, 
#and cast "Sentiment" column data into integer
data = tweets_csv.select("SentimentText", col("Sentiment").cast("Int").alias("label"))
data.show(truncate = False,n=5)

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows

+---------------------------------+-----+
|SentimentText                    |label|
+---------------------------------+-----+
|that film is fantastic #brilliant|1    |
|this music is really bad #myband |1    |
|winter is terrible #thumbs-down  |0    |
|this game is awful #nightmare    |0    |
|I love jam #loveit               |1    |
+---------------------------------+-----+
only showing top 5 rows



## Membagi data kedalam data latih dan data test

In [92]:
#divide data, 70% for training, 30% for testing
dividedData = data.randomSplit([0.7, 0.3]) 
trainingData = dividedData[0] #index 0 = data training
testingData = dividedData[1] #index 1 = data testing
train_rows = trainingData.count()
test_rows = testingData.count()
print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)

Training data rows: 1370 ; Testing data rows: 562
Training data rows: 1370 ; Testing data rows: 562


## Menyipkan data latih

In [93]:
tokenizer = Tokenizer(inputCol="SentimentText", outputCol="SentimentWords")
tokenizedTrain = tokenizer.transform(trainingData)
tokenizedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+
|SentimentText            |label|SentimentWords                |
+-------------------------+-----+------------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+
only showing top 5 rows

+-------------------------+-----+------------------------------+
|SentimentText            |label|SentimentWords                |
+-------------------------+-----+------------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |


### Menghapus stop words (kata yang tidak penting menjadi fitur)

In [94]:
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTrain = swr.transform(tokenizedTrain)
SwRemovedTrain.show(truncate=False, n=5)

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWords                |MeaningfulWords            |
+-------------------------+-----+------------------------------+---------------------------+
|I adore cheese #bestever |1    |[i, adore, cheese, #bestever] |[adore, cheese, #bestever] |
|I adore cheese #brilliant|1    |[i, adore, cheese, #brilliant]|[adore, cheese, #brilliant]|
|I adore cheese #favorite |1    |[i, adore, cheese, #favorite] |[adore, cheese, #favorite] |
|I adore cheese #loveit   |1    |[i, adore, cheese, #loveit]   |[adore, cheese, #loveit]   |
|I adore cheese #thumbs-up|1    |[i, adore, cheese, #thumbs-up]|[adore, cheese, #thumbs-up]|
+-------------------------+-----+------------------------------+---------------------------+
only showing top 5 rows

+-------------------------+-----+------------------------------+---------------------------+
|SentimentText            |label|SentimentWor

### Mengkonversi fitur kata ke fitur numerikal. Di spark 2.2.1, diimplementasikan dalam fungsi HashinhgTF menggunkan Austin Appleby's MurmurHash 3 Algorithm

In [95]:
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTrainData = hashTF.transform(SwRemovedTrain).select(
    'label', 'MeaningfulWords', 'features')
numericTrainData.show(truncate=False, n=3)

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #favorite] |(262144,[1689,100089,108624],[1.0,1.0,1.0])|
+-----+---------------------------+-------------------------------------------+
only showing top 3 rows

+-----+---------------------------+-------------------------------------------+
|label|MeaningfulWords            |features                                   |
+-----+---------------------------+-------------------------------------------+
|1    |[adore, cheese, #bestever] |(262144,[1689,91011,100089],[1.0,1.0,1.0]) |
|1    |[adore, cheese, #brilliant]|(262144,[1689,45361,100089],[1.0,1.0,1.0]) |
|1    |[adore, 

## Melatih model klasifikasi menggunakan data latih

In [96]:
lr = LogisticRegression(labelCol="label", featuresCol="features", 
                        maxIter=10, regParam=0.01)
model = lr.fit(numericTrainData)
print ("Training is done!")

Training is done!
Training is done!


## Mempersiapkan data testing

In [97]:
tokenizedTest = tokenizer.transform(testingData)
SwRemovedTest = swr.transform(tokenizedTest)
numericTest = hashTF.transform(SwRemovedTest).select(
    'Label', 'MeaningfulWords', 'features')
numericTest.show(truncate=False, n=2)

+-----+-------------------------------------+-------------------------------------------------------+
|Label|MeaningfulWords                      |features                                               |
+-----+-------------------------------------+-------------------------------------------------------+
|1    |[adore, classical, music, #bestever] |(262144,[91011,100089,102383,131250],[1.0,1.0,1.0,1.0])|
|1    |[adore, classical, music, #thumbs-up]|(262144,[88825,100089,102383,131250],[1.0,1.0,1.0,1.0])|
+-----+-------------------------------------+-------------------------------------------------------+
only showing top 2 rows

+-----+-------------------------------------+-------------------------------------------------------+
|Label|MeaningfulWords                      |features                                               |
+-----+-------------------------------------+-------------------------------------------------------+
|1    |[adore, classical, music, #bestever] |(262144,[910

## Memprediksi data testing dan menghitung akurasi model

In [98]:
prediction = model.transform(numericTest)
predictionFinal = prediction.select(
    "MeaningfulWords", "prediction", "Label")
predictionFinal.show(n=4, truncate = False)
correctPrediction = predictionFinal.filter(
    predictionFinal['prediction'] == predictionFinal['Label']).count()
totalData = predictionFinal.count()
print("correct prediction:", correctPrediction, ", total data:", totalData, 
      ", accuracy:", correctPrediction/totalData)

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, classical, music, #bestever] |1.0       |1    |
|[adore, classical, music, #thumbs-up]|1.0       |1    |
|[adore, coffee, #bestever]           |1.0       |1    |
|[adore, coffee, #thumbs-up]          |1.0       |1    |
+-------------------------------------+----------+-----+
only showing top 4 rows

+-------------------------------------+----------+-----+
|MeaningfulWords                      |prediction|Label|
+-------------------------------------+----------+-----+
|[adore, classical, music, #bestever] |1.0       |1    |
|[adore, classical, music, #thumbs-up]|1.0       |1    |
|[adore, coffee, #bestever]           |1.0       |1    |
|[adore, coffee, #thumbs-up]          |1.0       |1    |
+-------------------------------------+----------+-----+
only showing top 4 rows

correct prediction: 554 , total data: 

# Clustering in Spark

## Mengimpor modul dan membuat sesi spark

In [99]:
#import modules
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

#create session
appName = "Clustering in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Membaca data file kedalam dataframe spark

In [100]:
#read csv file using automatically inferred schema
customers = spark.read.csv(
    'dataset/customers.csv', inferSchema=True, header=True)
customers.show(3)

+---------------+---+-------------+-----------+------+-------------+--------------+---------+----------+---------+----+
|   CustomerName|Age|MaritalStatus|IncomeRange|Gender|TotalChildren|ChildrenAtHome|Education|Occupation|HomeOwner|Cars|
+---------------+---+-------------+-----------+------+-------------+--------------+---------+----------+---------+----+
|    Aaron Adams| 42|            0|      50000|     0|            0|             0|        3|         2|        1|   1|
|Aaron Alexander| 40|            1|      50000|     0|            0|             0|        2|         2|        1|   2|
|    Aaron Allen| 63|            0|      25000|     0|            2|             1|        2|         1|        1|   2|
+---------------+---+-------------+-----------+------+-------------+--------------+---------+----------+---------+----+
only showing top 3 rows

+---------------+---+-------------+-----------+------+-------------+--------------+---------+----------+---------+----+
|   CustomerNam

Description for each column data:

- CustomerName: name of customer
- Age: age of customer (in year)
- MaritalStatus: (1=married, 0=not married)
- IncomeRange: income per year (in USD)
- Gender: (1=female, 2=male)
- TotalChildren: number of children customer has
- ChildrenAtHome: number of children living with customer (in the same home)
- Education: (1=high school, 2=bachelor, 3=master, 4=PhD, 5=Post-doc)
- Occupation: (0=unskilled manual work until 5=professional)
- HomeOwner: (1=owning a home, 0=not owning a home)
- Cars: number of car customer has

## Mempersiapkan data

In [101]:
#define assembler
assembler = VectorAssembler(inputCols = [
    "Age", "MaritalStatus", "IncomeRange", "Gender", "TotalChildren", 
    "ChildrenAtHome", "Education", "Occupation", "HomeOwner", "Cars"], 
                            outputCol="features")
data = assembler.transform(customers).select('CustomerName', 'features')
data.show(truncate = False, n=3)

+---------------+----------------------------------------------+
|CustomerName   |features                                      |
+---------------+----------------------------------------------+
|Aaron Adams    |[42.0,0.0,50000.0,0.0,0.0,0.0,3.0,2.0,1.0,1.0]|
|Aaron Alexander|[40.0,1.0,50000.0,0.0,0.0,0.0,2.0,2.0,1.0,2.0]|
|Aaron Allen    |[63.0,0.0,25000.0,0.0,2.0,1.0,2.0,1.0,1.0,2.0]|
+---------------+----------------------------------------------+
only showing top 3 rows

+---------------+----------------------------------------------+
|CustomerName   |features                                      |
+---------------+----------------------------------------------+
|Aaron Adams    |[42.0,0.0,50000.0,0.0,0.0,0.0,3.0,2.0,1.0,1.0]|
|Aaron Alexander|[40.0,1.0,50000.0,0.0,0.0,0.0,2.0,2.0,1.0,2.0]|
|Aaron Allen    |[63.0,0.0,25000.0,0.0,2.0,1.0,2.0,1.0,1.0,2.0]|
+---------------+----------------------------------------------+
only showing top 3 rows



## Membuat Model Clustering K-Means

In [102]:
#define kMeans clustering algorithm
kmeans = KMeans(
    featuresCol=assembler.getOutputCol(), 
    predictionCol="cluster", k=5)
model = kmeans.fit(data)
print ("Model is successfully trained!")

Model is successfully trained!
Model is successfully trained!


## Mencetak centroid pada setiap cluster

In [103]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[5.19737441e+01 5.26868545e-01 5.00000000e+04 4.93961141e-01
 1.34552774e+00 4.98337126e-01 3.23035183e+00 2.77927534e+00
 6.62699107e-01 1.14615789e+00]
[5.82794840e+01 6.22850123e-01 1.50000000e+05 4.79729730e-01
 2.07248157e+00 3.20638821e+00 3.41461916e+00 4.34705160e+00
 6.48648649e-01 3.10995086e+00]
[5.53417813e+01 5.72411296e-01 1.00000000e+05 4.97103548e-01
 2.54380883e+00 1.54272266e+00 3.46198407e+00 4.19116582e+00
 7.16509776e-01 1.94532947e+00]
[5.60711289e+01 5.83804487e-01 7.50000000e+04 5.03921211e-01
 2.17308043e+00 8.16706183e-01 3.73244574e+00 3.92759438e+00
 7.23326646e-01 1.38063104e+00]
[5.31013005e+01 4.17180014e-01 2.50000000e+04 4.80492813e-01
 1.41512663e+00 6.08487337e-01 2.31622177e+00 1.45448323e+00
 5.93086927e-01 1.11464750e+00]
Cluster Centers: 
[5.19737441e+01 5.26868545e-01 5.00000000e+04 4.93961141e-01
 1.34552774e+00 4.98337126e-01 3.23035183e+00 2.77927534e+00
 6.62699107e-01 1.14615789e+00]
[5.82794840e+01 6.22850123e-01 1.5000000

## Cluster The Data

In [104]:
prediction = model.transform(data)#cluster given data
prediction.groupBy("cluster").count().orderBy("cluster").show()#count members in each cluster
prediction.select('CustomerName', 'cluster').show(5)#show several cluster

+-------+-----+
|cluster|count|
+-------+-----+
|      0| 5713|
|      1| 1628|
|      2| 2762|
|      3| 5483|
|      4| 2922|
+-------+-----+

+---------------+-------+
|   CustomerName|cluster|
+---------------+-------+
|    Aaron Adams|      0|
|Aaron Alexander|      0|
|    Aaron Allen|      4|
|    Aaron Baker|      0|
|   Aaron Bryant|      3|
+---------------+-------+
only showing top 5 rows

+-------+-----+
|cluster|count|
+-------+-----+
|      0| 5713|
|      1| 1628|
|      2| 2762|
|      3| 5483|
|      4| 2922|
+-------+-----+

+---------------+-------+
|   CustomerName|cluster|
+---------------+-------+
|    Aaron Adams|      0|
|Aaron Alexander|      0|
|    Aaron Allen|      4|
|    Aaron Baker|      0|
|   Aaron Bryant|      3|
+---------------+-------+
only showing top 5 rows



# Recommender System in Spark

## Mengimpor modul dan membuat sesi spark

In [105]:
#import module
from pyspark.ml.recommendation import ALS

#create session
appName = "Recommender system in Spark"
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Membaca data file kedalam dataframe spark

In [106]:
#read file into dataFrame using automatically inferred schema
ratings = spark.read.csv('dataset/ratings.csv', inferSchema=True, header=True)
movies = spark.read.csv('dataset/movies.csv', inferSchema=True, header=True)
#merge "movies" and "ratings" dataFrame based on "movieId"
ratings.join(movies, "movieId").show(3)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|     31|     1|   2.5|1260759144|Dangerous Minds (...|               Drama|
|   1029|     1|   3.0|1260759179|        Dumbo (1941)|Animation|Childre...|
|   1061|     1|   3.0|1260759182|     Sleepers (1996)|            Thriller|
+-------+------+------+----------+--------------------+--------------------+
only showing top 3 rows

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|     31|     1|   2.5|1260759144|Dangerous Minds (...|               Drama|
|   1029|     1|   3.0|1260759179|        Dumbo (1941)|Animation|Childre...|
|   1061|     1|   3.0|1260759182|     Sleepers (19

## Persiapan Data

In [107]:
#use only column data of "userId", "movieId", dan "rating"
data = ratings.select("userId", "movieId", "rating")
#divide data, 70% for training and 30% for testing
splits = data.randomSplit([0.7, 0.3])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")
#calculate number of rows
train_rows = train.count()
test_rows = test.count()
print ("number of training data rows:", train_rows, 
       ", number of testing data rows:", test_rows)

number of training data rows: 69982 , number of testing data rows: 30022
number of training data rows: 69982 , number of testing data rows: 30022


## Mendefinisikan model dan melatihnya

In [108]:
#define ALS (Alternating Least Square) as our recommender system
als = ALS(maxIter=19, regParam=0.01, userCol="userId", 
          itemCol="movieId", ratingCol="label")
#train our ALS model
model = als.fit(train)
print("Training is done!")

Training is done!
Training is done!


## Memprediksi data testing

In [109]:
prediction = model.transform(test)
print("testing is done!")

testing is done!
testing is done!


In [110]:
prediction.join(movies, "movieId").select(
    "userId", "title", "prediction", "trueLabel").show(n=10, truncate=False)

+------+---------------------------+----------+---------+
|userId|title                      |prediction|trueLabel|
+------+---------------------------+----------+---------+
|232   |Guilty as Sin (1993)       |4.088933  |4.0      |
|126   |Hudsucker Proxy, The (1994)|3.645697  |5.0      |
|491   |Hudsucker Proxy, The (1994)|5.2743235 |3.0      |
|452   |Hudsucker Proxy, The (1994)|4.543952  |3.0      |
|92    |Hudsucker Proxy, The (1994)|2.0869389 |4.0      |
|309   |Hudsucker Proxy, The (1994)|5.1569223 |4.0      |
|607   |Hudsucker Proxy, The (1994)|4.127407  |4.0      |
|15    |Hudsucker Proxy, The (1994)|2.9285796 |3.0      |
|659   |Hudsucker Proxy, The (1994)|4.0024414 |4.0      |
|241   |Hudsucker Proxy, The (1994)|3.702203  |4.0      |
+------+---------------------------+----------+---------+
only showing top 10 rows

+------+---------------------------+----------+---------+
|userId|title                      |prediction|trueLabel|
+------+---------------------------+----------

## Mengevaluasi akurasi model

In [111]:
#import RegressionEvaluator since we also want to calculate RMSE (Root Mean Square Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="trueLabel", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(prediction)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): nan
Root Mean Square Error (RMSE): nan


In [112]:
prediction.count()
a = prediction.count()
print("number of original data rows: ", a)
#drop rows with any missing data
cleanPred = prediction.dropna(how="any", subset=["prediction"])
b = cleanPred.count()
print("number of rows after dropping data with missing value: ", b)
print("number of missing data: ", a-b)

number of original data rows:  30022
number of original data rows:  30022
number of rows after dropping data with missing value:  28769
number of missing data:  1253
number of rows after dropping data with missing value:  28769
number of missing data:  1253


In [113]:
rmse = evaluator.evaluate(cleanPred)
print ("Root Mean Square Error (RMSE):", rmse)

Root Mean Square Error (RMSE): 1.2582324528766913
Root Mean Square Error (RMSE): 1.2582324528766913
