In [None]:
# Example: Linear Regression with Spark MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler

# Inisialisasi Spark
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# Menggabungkan fitur menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# Menyusun dan melatih model logistic regression
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# Menampilkan koefisien dan intercept
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

# Menutup Spark
spark.stop()

Coefficients: [-12.262057929180484,4.087352266486688]
Intercept: 11.56891272665312


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Inisialisasi Spark
spark = SparkSession.builder \
    .getOrCreate()

# Contoh dataset
data = [(1, 1.0, 1.0), (2, 5.0, 5.0), (3, 10.0, 10.0), (4, 15.0, 15.0)]
columns = ['ID', 'Feature1', 'Feature2']
df = spark.createDataFrame(data, columns)

# Menggabungkan fitur menjadi satu kolom vektor
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# Melatih model KMeans
kmeans = KMeans(featuresCol='Features', k=2)
model = kmeans.fit(df)

# Menampilkan pusat cluster
centers = model.clusterCenters()
print(f'Cluster Centers: {centers}')

# Menutup Spark
spark.stop()

Cluster Centers: [array([5.33333333, 5.33333333]), array([15., 15.])]


HOMEWORK

In [None]:
from pyspark.sql import SparkSession

# Membuat SparkSession
spark = SparkSession.builder \
    .getOrCreate()

# Memuat dataset dari file CSV
df = spark.read.csv("/content/Rental_Properties.csv", header=True, inferSchema=True)

# Menampilkan beberapa baris dari dataset
df.show()

+---+------------+-----+--------------------+-----+----------+----+-----+----+-----+----------------+-----+-----------+-----+----------+------------+----------------+-------+
| Id|        City|State|      Street_Address|  Zip|House_Type|Beds|Baths|Sqft|Price|Security_Deposit| Pets|Smoking_Ind| Pool|Dishwasher|Washer_Dryer|Air_Conditioning|Parking|
+---+------------+-----+--------------------+-----+----------+----+-----+----+-----+----------------+-----+-----------+-----+----------+------------+----------------+-------+
|  1|   Marlinton|   WV|        805 10th Ave|24954| Apartment|   1|  1.0| 550|$275 |           $275 | TRUE|      false|false|     false|       false|           false|  false|
|  2|Sylvan Grove|   KS|312 N. Pennsylvan...|67481| Apartment|   1|  1.0| 588|$300 |           $300 |FALSE|      false|false|     false|       false|           false|  false|
|  3|   Ishpeming|   MI|550 Cleveland Avenue|49849| Apartment|   1|  1.0| 784|$304 |           $304 | TRUE|      false|false|

In [None]:
# Memeriksa skema DataFrame
df.printSchema()

# Memeriksa jumlah nilai yang hilang
df.describe().show()

# Menghapus baris dengan nilai yang hilang
df_clean = df.dropna()

root
 |-- Id: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Street_Address: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- House_Type: string (nullable = true)
 |-- Beds: string (nullable = true)
 |-- Baths: double (nullable = true)
 |-- Sqft: integer (nullable = true)
 |-- Price: string (nullable = true)
 |-- Security_Deposit: string (nullable = true)
 |-- Pets: string (nullable = true)
 |-- Smoking_Ind: boolean (nullable = true)
 |-- Pool: boolean (nullable = true)
 |-- Dishwasher: boolean (nullable = true)
 |-- Washer_Dryer: boolean (nullable = true)
 |-- Air_Conditioning: boolean (nullable = true)
 |-- Parking: boolean (nullable = true)

+-------+------------------+---------+------+--------------------+------------------+----------+------------------+------------------+------------------+-----------------+----------------+------+
|summary|                Id|     City| State|      Street_Address|            

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

# Inisialisasi Spark
spark = SparkSession.builder \
    .appName("RentalPropertiesPreparation") \
    .getOrCreate()

# Membaca dataset dari file CSV
df = spark.read.csv("/content/Rental_Properties.csv", header=True, inferSchema=True)

# Menampilkan beberapa baris dari dataset
df.show(5)

# Mengonversi kolom harga menjadi float
df = df.withColumn("Price", df["Price"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Mengonversi kolom-kolom yang diperlukan menjadi numerik
df = df.withColumn("Beds", df["Beds"].cast("integer")) \
       .withColumn("Baths", df["Baths"].cast("integer")) \
       .withColumn("Sqft", df["Sqft"].cast("integer")) \
       .withColumn("Security_Deposit", df["Security_Deposit"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Menentukan fitur yang akan digunakan
feature_columns = ['Beds', 'Baths', 'Sqft', 'Security_Deposit']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Menerapkan VectorAssembler untuk dataset
df = assembler.transform(df)

# Mempersiapkan label (misal, klasifikasi berdasarkan harga)
df = df.withColumn("label", (df["Price"] > 400).cast("integer"))

# Menampilkan dataset yang sudah dipersiapkan
df.select("features", "label").show(5)

# Menutup Spark
spark.stop()

+---+------------+-----+--------------------+-----+----------+----+-----+----+-----+----------------+-----+-----------+-----+----------+------------+----------------+-------+
| Id|        City|State|      Street_Address|  Zip|House_Type|Beds|Baths|Sqft|Price|Security_Deposit| Pets|Smoking_Ind| Pool|Dishwasher|Washer_Dryer|Air_Conditioning|Parking|
+---+------------+-----+--------------------+-----+----------+----+-----+----+-----+----------------+-----+-----------+-----+----------+------------+----------------+-------+
|  1|   Marlinton|   WV|        805 10th Ave|24954| Apartment|   1|  1.0| 550|$275 |           $275 | TRUE|      false|false|     false|       false|           false|  false|
|  2|Sylvan Grove|   KS|312 N. Pennsylvan...|67481| Apartment|   1|  1.0| 588|$300 |           $300 |FALSE|      false|false|     false|       false|           false|  false|
|  3|   Ishpeming|   MI|550 Cleveland Avenue|49849| Apartment|   1|  1.0| 784|$304 |           $304 | TRUE|      false|false|

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Inisialisasi Spark
spark = SparkSession.builder \
    .appName("RentalPropertiesClassification") \
    .getOrCreate()

# Membaca dataset dari file CSV
df = spark.read.csv("/content/Rental_Properties.csv", header=True, inferSchema=True)

# Menampilkan skema untuk memeriksa kolom
df.printSchema()

# Mengonversi kolom harga menjadi float
df = df.withColumn("Price", df["Price"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Mengonversi kolom-kolom yang diperlukan menjadi numerik
df = df.withColumn("Beds", df["Beds"].cast("integer")) \
       .withColumn("Baths", df["Baths"].cast("integer")) \
       .withColumn("Sqft", df["Sqft"].cast("integer")) \
       .withColumn("Security_Deposit", df["Security_Deposit"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Memeriksa apakah ada nilai null dan menghapusnya
df = df.na.drop()

# Mempersiapkan label (misal, klasifikasi berdasarkan harga)
df = df.withColumn("label", (df["Price"] > 400).cast("integer"))

# Menentukan fitur yang akan digunakan
feature_columns = ['Beds', 'Baths', 'Sqft', 'Security_Deposit']
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Menerapkan VectorAssembler untuk dataset
df = assembler.transform(df)

# Membagi data menjadi training dan testing
train, test = df.randomSplit([0.8, 0.2], seed=1234)

# Membangun model Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(train)

# Melakukan prediksi pada data testing
predictions = model.transform(test)

# Menampilkan beberapa prediksi
predictions.select("features", "label", "prediction").show(5)

# Evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Akurasi Model: {accuracy:.2f}')

# Menutup Spark
spark.stop()

root
 |-- Id: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Street_Address: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- House_Type: string (nullable = true)
 |-- Beds: string (nullable = true)
 |-- Baths: double (nullable = true)
 |-- Sqft: integer (nullable = true)
 |-- Price: string (nullable = true)
 |-- Security_Deposit: string (nullable = true)
 |-- Pets: string (nullable = true)
 |-- Smoking_Ind: boolean (nullable = true)
 |-- Pool: boolean (nullable = true)
 |-- Dishwasher: boolean (nullable = true)
 |-- Washer_Dryer: boolean (nullable = true)
 |-- Air_Conditioning: boolean (nullable = true)
 |-- Parking: boolean (nullable = true)

+--------------------+-----+----------+
|            features|label|prediction|
+--------------------+-----+----------+
|[1.0,1.0,588.0,30...|    0|       1.0|
|[1.0,1.0,600.0,50...|    0|       1.0|
|[1.0,1.0,570.0,39...|    0|       1.0|
|[1.0,1.0,555.0,40...|    0|      

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Inisialisasi Spark
spark = SparkSession.builder \
    .appName("RentalPropertiesClassification") \
    .getOrCreate()

# Membaca dataset dari file CSV
df = spark.read.csv("/content/Rental_Properties.csv", header=True, inferSchema=True)

# Menampilkan skema untuk memeriksa kolom
df.printSchema()

# Mengonversi kolom harga menjadi float
df = df.withColumn("Price", df["Price"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Mengonversi kolom yang diperlukan menjadi numerik
df = df.withColumn("Beds", df["Beds"].cast("integer")) \
       .withColumn("Baths", df["Baths"].cast("integer")) \
       .withColumn("Sqft", df["Sqft"].cast("integer")) \
       .withColumn("Security_Deposit", df["Security_Deposit"].substr(2, 10).cast("float"))  # Menghilangkan tanda '$'

# Memeriksa apakah ada nilai null dan menghapusnya
df = df.na.drop()

# Mempersiapkan label (misal, klasifikasi berdasarkan harga)
df = df.withColumn("label", (df["Price"] > 400).cast("integer"))

# Menentukan fitur yang akan digunakan
feature_columns = ['Beds', 'Baths', 'Sqft', 'Security_Deposit']  # Pastikan kolom ini ada
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

# Menerapkan VectorAssembler untuk dataset
df = assembler.transform(df)

# Membagi data menjadi training dan testing
train, test = df.randomSplit([0.8, 0.2], seed=1234)

# Menyusun model Logistic Regression
lr = LogisticRegression(featuresCol='features', labelCol='label')

# Menyusun grid parameter untuk cross-validation
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

# Menyusun cross-validator
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy'),
                          numFolds=3)

# Melatih model dengan cross-validation
cvModel = crossval.fit(train)

# Melakukan prediksi pada data testing
predictions = cvModel.transform(test)

# Evaluasi model
evaluator = MulticlassClassificationEvaluator(labelCol='label', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print(f'Akurasi Model setelah Tuning: {accuracy:.2f}')

# Menutup Spark
spark.stop()

root
 |-- Id: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Street_Address: string (nullable = true)
 |-- Zip: string (nullable = true)
 |-- House_Type: string (nullable = true)
 |-- Beds: string (nullable = true)
 |-- Baths: double (nullable = true)
 |-- Sqft: integer (nullable = true)
 |-- Price: string (nullable = true)
 |-- Security_Deposit: string (nullable = true)
 |-- Pets: string (nullable = true)
 |-- Smoking_Ind: boolean (nullable = true)
 |-- Pool: boolean (nullable = true)
 |-- Dishwasher: boolean (nullable = true)
 |-- Washer_Dryer: boolean (nullable = true)
 |-- Air_Conditioning: boolean (nullable = true)
 |-- Parking: boolean (nullable = true)

Akurasi Model setelah Tuning: 1.00
