<a href="https://colab.research.google.com/github/salwazpw/2023_BigData/blob/main/Spark%20MLlib.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Implementasi Spark MLlib dengan File CSV**

In [1]:
#Instalasi pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=861240d537edb4b340bfd959a7f65f9484d1e40fe42fdbc389e8f6b8780a5de1
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
#Instalasi findspark
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


**Import Library**

In [3]:
import pyspark
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder\
.master("local")\
.appName("Colab")\
.config('spark.ui.port', '4050')\
.getOrCreate()

spark

In [12]:
df = spark.read.format("csv").load("employees.csv", header=True, inferSchema=True)

df.printSchema()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)



In [13]:
df.show()

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|            - |       201|           20|


In [14]:
df.columns

['EMPLOYEE_ID',
 'FIRST_NAME',
 'LAST_NAME',
 'EMAIL',
 'PHONE_NUMBER',
 'HIRE_DATE',
 'JOB_ID',
 'SALARY',
 'COMMISSION_PCT',
 'MANAGER_ID',
 'DEPARTMENT_ID']

In [15]:
from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn('id', monotonically_increasing_id())

df = df[['id'] + df.columns[:-1]]

df.show()

+---+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
| id|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+---+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|  0|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|  1|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|
|  2|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|
|  3|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20|
|  4|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6000|         

In [16]:
df.count()

50

In [17]:
df.select('SALARY').agg({'SALARY': 'avg'}).show()

+-----------+
|avg(SALARY)|
+-----------+
|    6182.32|
+-----------+



In [23]:
from pyspark.ml.feature import VectorAssembler
featureassembler=VectorAssembler(inputCols=["id", "SALARY"], outputCol="ID_SALARY")

In [24]:
output = featureassembler.transform(df)

In [25]:
output.show()

+---+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+--------------+
| id|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|     ID_SALARY|
+---+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+--------------+
|  0|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|  [0.0,2600.0]|
|  1|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  2600|            - |       124|           50|  [1.0,2600.0]|
|  2|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4400|            - |       101|           10|  [2.0,4400.0]|
|  3|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13000|            - |       100|           20| [3.0,13

In [26]:
output.columns

['id',
 'EMPLOYEE_ID',
 'FIRST_NAME',
 'LAST_NAME',
 'EMAIL',
 'PHONE_NUMBER',
 'HIRE_DATE',
 'JOB_ID',
 'SALARY',
 'COMMISSION_PCT',
 'MANAGER_ID',
 'DEPARTMENT_ID',
 'ID_SALARY']

In [28]:
final = output.select("ID_SALARY", "SALARY")

In [29]:
final.show()

+--------------+------+
|     ID_SALARY|SALARY|
+--------------+------+
|  [0.0,2600.0]|  2600|
|  [1.0,2600.0]|  2600|
|  [2.0,4400.0]|  4400|
| [3.0,13000.0]| 13000|
|  [4.0,6000.0]|  6000|
|  [5.0,6500.0]|  6500|
| [6.0,10000.0]| 10000|
| [7.0,12008.0]| 12008|
|  [8.0,8300.0]|  8300|
| [9.0,24000.0]| 24000|
|[10.0,17000.0]| 17000|
|[11.0,17000.0]| 17000|
| [12.0,9000.0]|  9000|
| [13.0,6000.0]|  6000|
| [14.0,4800.0]|  4800|
| [15.0,4800.0]|  4800|
| [16.0,4200.0]|  4200|
|[17.0,12008.0]| 12008|
| [18.0,9000.0]|  9000|
| [19.0,8200.0]|  8200|
+--------------+------+
only showing top 20 rows



In [30]:
from pyspark.ml.regression import LinearRegression

train_data, test_data = final.randomSplit([0.70, 0.30])
regressor = LinearRegression(featuresCol='ID_SALARY', labelCol='SALARY')
regressor = regressor.fit(train_data)

In [31]:
regressor.coefficients

DenseVector([-0.0, 1.0])

In [32]:
regressor.intercept

5.276708495092481e-12

In [33]:
result = regressor.evaluate(test_data)

In [34]:
result.predictions.show()

+--------------+------+------------------+
|     ID_SALARY|SALARY|        prediction|
+--------------+------+------------------+
|  [2.0,4400.0]|  4400| 4400.000000000004|
|  [4.0,6000.0]|  6000| 6000.000000000003|
| [6.0,10000.0]| 10000|           10000.0|
|[10.0,17000.0]| 17000|16999.999999999996|
|[17.0,12008.0]| 12008|12007.999999999998|
| [20.0,7700.0]|  7700|            7700.0|
| [22.0,6900.0]|  6900|            6900.0|
|[23.0,11000.0]| 11000|10999.999999999998|
| [25.0,2900.0]|  2900| 2900.000000000002|
| [35.0,2700.0]|  2700| 2700.000000000001|
| [36.0,2400.0]|  2400| 2400.000000000001|
| [38.0,3300.0]|  3300|3300.0000000000005|
| [41.0,2100.0]|  2100| 2100.000000000001|
| [47.0,3200.0]|  3200|3199.9999999999995|
| [48.0,2700.0]|  2700|            2700.0|
+--------------+------+------------------+



In [35]:
result.meanAbsoluteError, result.meanSquaredError

(1.2732925824820995e-12, 3.1157136406164042e-24)