## PYSPARK

### IMPORT LIBRARY FROM PYSPARK 

In [247]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from tabulate import tabulate

### INIT SESSION SPARK 

In [248]:
# Inisialisasi sesi Spark
spark = SparkSession.builder.appName("appbg").getOrCreate()

In [249]:
spark

### Merge Data

In [250]:
# Baca file CSV pertama (data-user.csv)
df_user = spark.read.csv("file:///_script\_csv\data-user.csv", header=True, inferSchema=True)

# Baca file CSV kedua (plat.csv)
df_plat = spark.read.csv("file:///_script\_csv\combined-plate.csv", header=True, inferSchema=True)

##### Cleansing

In [251]:
df_user = df_user.withColumnRenamed("_id", "id")
df_user = df_user.withColumnRenamed("plateNumber", "plate")

df_plat = df_plat.withColumnRenamed("_id", "id")

In [252]:
df_user.show(5)

+--------------------+------------+--------------------+--------------------+--------+-----+--------+--------------------+--------------------+
|                  id|    username|               email|      profilePicture|hourStay| cost|   plate|         phoneNumber|           createdAt|
+--------------------+------------+--------------------+--------------------+--------+-----+--------+--------------------+--------------------+
|6649ae85d0125561e...|Unnecessarya|mafalda.baumbach@...|https://picsum.ph...|       1| 5000|B3C5V7SA|1-320-682-2369 x0658|2024-05-19 07:47:...|
|6649ae85d0125561e...|Paintedstriv|antoinette_oconne...|https://picsum.ph...|       3|15000|B5P9V2NC|   609-371-0711 x417|2024-05-19 07:47:...|
|6649ae85d0125561e...|Elongatedfli|jazmin.hintz@hotm...|https://picsum.ph...|      10|50000|B9X4P1NF|1-479-624-3676 x6...|2024-05-19 07:47:...|
|6649ae86d0125561e...|   Milkyomen|karson.mccullough...|https://picsum.ph...|       3|15000|B3C5V7SA| 1-318-806-0229 x693|2024-05-19 07:

In [253]:
df_plat.show(5)

+--------------------+--------+------+----------+----+
|                  id|   plate|source|last_digit|type|
+--------------------+--------+------+----------+----+
|6649a87e4817b6fcf...|B8C2X4VE|   car|         4|even|
|6649a87e4817b6fcf...|B1M3U8RG|   car|         8|even|
|6649a87e4817b6fcf...|B4B7X8DE|   car|         8|even|
|6649a87e4817b6fcf...|B1N3P8VG|   car|         8|even|
|6649a87e4817b6fcf...|B2G6K4XH|   car|         4|even|
+--------------------+--------+------+----------+----+
only showing top 5 rows



In [254]:
df_merge = df_plat.join(df_user, "plate")

In [255]:
# Menampilkan hasil data yang sudah di gabungkan 
df_merge.show(5)

+--------+--------------------+------+----------+----+--------------------+------------+--------------------+--------------------+--------+-----+--------------------+--------------------+
|   plate|                  id|source|last_digit|type|                  id|    username|               email|      profilePicture|hourStay| cost|         phoneNumber|           createdAt|
+--------+--------------------+------+----------+----+--------------------+------------+--------------------+--------------------+--------+-----+--------------------+--------------------+
|B3C5V7SA|6649a87e4817b6fcf...|   car|         7| odd|6649ae85d0125561e...|Unnecessarya|mafalda.baumbach@...|https://picsum.ph...|       1| 5000|1-320-682-2369 x0658|2024-05-19 07:47:...|
|B5P9V2NC|6649a87e4817b6fcf...|   car|         2|even|6649ae85d0125561e...|Paintedstriv|antoinette_oconne...|https://picsum.ph...|       3|15000|   609-371-0711 x417|2024-05-19 07:47:...|
|B9X4P1NF|6649a87e4817b6fcf...|   car|         1| odd|6649ae

##### Mendapatkan insight data

In [256]:
# Menampilkan hasil data yang mudah di baca
df_merge = df_merge.withColumn("createdAt", date_format(df_merge["createdAt"], "yyyy-MM-dd HH:mm:ss"))
tabulate_df = df_merge.select("plate", "username", "hourStay", "cost", "createdAt", "source", "type").toPandas()

print(tabulate(tabulate_df.head(100), headers='keys', tablefmt='fancy_grid'))

╒════╤══════════╤══════════════╤════════════╤════════╤═════════════════════╤══════════╤════════╕
│    │ plate    │ username     │   hourStay │   cost │ createdAt           │ source   │ type   │
╞════╪══════════╪══════════════╪════════════╪════════╪═════════════════════╪══════════╪════════╡
│  0 │ B3C5V7SA │ Unnecessarya │          1 │   5000 │ 2024-05-19 07:47:17 │ car      │ odd    │
├────┼──────────┼──────────────┼────────────┼────────┼─────────────────────┼──────────┼────────┤
│  1 │ B5P9V2NC │ Paintedstriv │          3 │  15000 │ 2024-05-19 07:47:17 │ car      │ even   │
├────┼──────────┼──────────────┼────────────┼────────┼─────────────────────┼──────────┼────────┤
│  2 │ B9X4P1NF │ Elongatedfli │         10 │  50000 │ 2024-05-19 07:47:17 │ car      │ odd    │
├────┼──────────┼──────────────┼────────────┼────────┼─────────────────────┼──────────┼────────┤
│  3 │ B3C5V7SA │ Milkyomen    │          3 │  15000 │ 2024-05-19 07:47:17 │ car      │ odd    │
├────┼──────────┼─────────────

In [257]:
# Menghitung jumlah odd dan even type car dari df_merge
odd_count = df_merge.filter(col("type") == "odd").count()
even_count = df_merge.filter(col("type") == "even").count()

print("Jumlah odd type car dari df_merge:", odd_count)
print("Jumlah even type car dari df_merge:", even_count)

Jumlah odd type car dari df_merge: 70
Jumlah even type car dari df_merge: 30


In [258]:
# Menghitung total cost plat kendaraan dengan type 'odd'
total_cost_odd = df_merge.filter(df_merge['type'] == 'odd').agg(F.sum('cost').alias('total_cost_odd')).collect()[0]['total_cost_odd']

# Menghitung total cost plat kendaraan dengan type 'even'
total_cost_even = df_merge.filter(df_merge['type'] == 'even').agg(F.sum('cost').alias('total_cost_even')).collect()[0]['total_cost_even']

print("Total cost plat kendaraan dengan type 'odd':", total_cost_odd)
print("Total cost plat kendaraan dengan type 'even':", total_cost_even)

Total cost plat kendaraan dengan type 'odd': 1835000
Total cost plat kendaraan dengan type 'even': 795000


In [259]:
# Menghitung total hourStay
total_hourStay = df_merge.agg(F.sum('hourStay').alias('total_hourStay')).collect()[0]['total_hourStay']

print("Total hourStay:", total_hourStay)

Total hourStay: 526


In [260]:
# Rata-rata hourStay dan cost
avg_hourStay = df_merge.agg(F.avg('hourStay').alias('avg_hourStay')).collect()[0]['avg_hourStay']
avg_cost = df_merge.agg(F.avg('cost').alias('avg_cost')).collect()[0]['avg_cost']

print("Rata-rata hourStay:", avg_hourStay)
print("Rata-rata cost:", avg_cost)

Rata-rata hourStay: 5.26
Rata-rata cost: 26300.0


In [261]:
# Plat kendaraan dengan hourStay tertinggi
max_hourStay_plate = df_merge.groupBy('plate').agg(F.sum('hourStay').alias('total_hourStay')).orderBy('total_hourStay', ascending=False).first()['plate']
print("Plat kendaraan dengan hourStay tertinggi:", max_hourStay_plate)

Plat kendaraan dengan hourStay tertinggi: B8Z1N4KA


In [262]:
# Distribusi hourStay dan cost
hourStay_distribution = df_merge.groupBy('hourStay').count().orderBy('hourStay')
cost_distribution = df_merge.groupBy('cost').count().orderBy('cost')

print("Distribusi hourStay:")
hourStay_distribution.show()

Distribusi hourStay:
+--------+-----+
|hourStay|count|
+--------+-----+
|       1|   12|
|       2|   10|
|       3|   15|
|       4|    9|
|       5|    7|
|       6|   10|
|       7|    8|
|       8|    9|
|       9|   10|
|      10|   10|
+--------+-----+



In [263]:
# Biaya parkir rata-rata per jam
avg_cost_per_hour = df_merge.agg((F.sum('cost') / F.sum('hourStay')).alias('avg_cost_per_hour')).collect()[0]['avg_cost_per_hour']
print("Biaya parkir rata-rata per jam:", avg_cost_per_hour)

Biaya parkir rata-rata per jam: 5000.0


In [264]:
# Analisis perbandingan antara plat kendaraan
comparison = df_merge.groupBy('plate').agg(F.sum('hourStay').alias('total_hourStay'), F.sum('cost').alias('total_cost')).orderBy('total_hourStay', ascending=False)
print("Analisis perbandingan antara plat kendaraan:")
comparison.show(10)

Analisis perbandingan antara plat kendaraan:
+--------+--------------+----------+
|   plate|total_hourStay|total_cost|
+--------+--------------+----------+
|B8Z1N4KA|            26|    130000|
|B1S8U5RD|            22|    110000|
|B5P9V2NC|            20|    100000|
|B9X4P1NF|            20|    100000|
|B3C5V7SA|            19|     95000|
|B1D8W7ME|            19|     95000|
|B2X4V5TH|            17|     85000|
|B5S7U8TB|            16|     80000|
|B1N3U8ME|            15|     75000|
|B8H3U9PA|            13|     65000|
+--------+--------------+----------+
only showing top 10 rows



In [265]:
# Tren parkir dari waktu ke waktu
parking_trend = df_merge.groupBy('createdAt').count().orderBy('createdAt')
print("Tren parkir dari waktu ke waktu:")
parking_trend.show(10)

Tren parkir dari waktu ke waktu:
+-------------------+-----+
|          createdAt|count|
+-------------------+-----+
|2024-05-19 07:47:17|    4|
|2024-05-19 07:47:18|    6|
|2024-05-19 07:48:07|    1|
|2024-05-19 07:48:08|    7|
|2024-05-19 07:48:09|    2|
|2024-05-19 07:48:41|    2|
|2024-05-19 07:48:42|    5|
|2024-05-19 07:48:43|    3|
|2024-05-19 07:48:56|    3|
|2024-05-19 07:48:57|    4|
+-------------------+-----+
only showing top 10 rows



In [266]:
# Korelasi antara hourStay dan cost
correlation = df_merge.corr('hourStay', 'cost')
print("Korelasi antara hourStay dan cost:", correlation)

Korelasi antara hourStay dan cost: 1.0000000000000002


In [267]:
# Menghitung total hourStay dari masing-masing plat kendaraan
total_hourStay_per_plate = df_merge.groupBy('plate').agg(F.sum('hourStay').alias('total_hourStay'))

# Menampilkan total_hourStay dari masing-masing plat kendaraan
total_hourStay_per_plate.show(10)

+--------+--------------+
|   plate|total_hourStay|
+--------+--------------+
|B6P2Z4UA|            10|
|B3P9Z1CE|            10|
|B6R1T7NE|             5|
|B3S8Z2UD|             7|
|B2X4V9RA|             5|
|B5M2W3CI|             2|
|B8P5F3TA|            13|
|B3C5V7SA|            19|
|B1H9V5RH|            13|
|B1N3U8ME|            15|
+--------+--------------+
only showing top 10 rows



### SPARK MACHINE LEARNING

In [268]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

##### Vektor

In [269]:
# Membuat vektor fitur
assembler = VectorAssembler(inputCols=["hourStay"], outputCol="features")
df_merge = assembler.transform(df_merge)

##### Preprocessing

In [270]:
# Bagi data menjadi set pelatihan dan set pengujian
(train_data, test_data) = df_merge.randomSplit([0.8, 0.2], seed=42)

In [271]:
# Inisialisasi model Regresi Linear
lr = LinearRegression(featuresCol="features", labelCol="cost")

In [272]:
# Latih model
lr_model = lr.fit(train_data)

##### Evaluate model

In [273]:
# Prediksi pada set pengujian
predictions = lr_model.transform(test_data)

In [274]:
# Evaluasi model menggunakan Mean Squared Error
evaluator = RegressionEvaluator(labelCol="cost", predictionCol="prediction", metricName="mse")
mse = evaluator.evaluate(predictions)
print(f"Mean Squared Error: {mse}")

Mean Squared Error: 3.133554791083234e-23


##### Test result 

In [275]:
# Prediksi pada set pengujian
predictions = lr_model.transform(test_data)

# Menyimpan hasil prediksi ke dalam DataFrame baru
predictions_df = predictions.select("plate", "hourStay", "cost", "prediction")

# Menampilkan hasil prediksi
predictions_df.show(5)

+--------+--------+-----+-----------------+
|   plate|hourStay| cost|       prediction|
+--------+--------+-----+-----------------+
|B1H9V5RH|       6|30000|          30000.0|
|B1N3U8ME|       7|35000|35000.00000000001|
|B1R8N3XB|       1| 5000|4999.999999999989|
|B2G6K4XH|       5|25000|          25000.0|
|B2R7W9JD|       9|45000|45000.00000000001|
+--------+--------+-----+-----------------+
only showing top 5 rows

