In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [3]:
# Example: Linear Regression with Spark MLlib
from pyspark.ml.regression import LinearRegression

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

24/12/04 02:06:07 WARN Utils: Your hostname, codespaces-fcac35 resolves to a loopback address: 127.0.0.1; using 10.0.0.62 instead (on interface eth0)
24/12/04 02:06:07 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/04 02:06:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/04 02:06:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/12/04 02:06:08 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
24/12/04 02:06:12 WARN Instrumentation: [c6b75b48] regParam is zero, which might cause numerical instability and overfitting.
24/12/04 02:06:13 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/04 02:06:14 WARN InstanceBuilder: Failed to load imp

Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


In [4]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# mengubah kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


24/12/04 02:06:18 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Coefficients: [-12.262057918200023,4.087352262827852]
Intercept: 11.568912715666139


In [5]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# menguhab kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')


Coefficients: [-12.262057935078403,4.087352268454035]
Intercept: 11.568912732544247


# Homework

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

spark = SparkSession.builder.appName("Homework").getOrCreate()
dataPath = "actx.us.txt"
df = spark.read.csv(dataPath, header=True, inferSchema=True)
df.show(5)

df = df.withColumn("Target", (df["Close"] > df["Open"]).cast("int"))
df = df.select("Open", "High", "Low", "Close", "Volume", "Target")
df = df.na.drop()

featureCols = ["Open", "High", "Low", "Close", "Volume"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="Features")
df = assembler.transform(df)
trainDf, testDf = df.randomSplit([0.8, 0.2], seed=42)

lr = LogisticRegression(featuresCol="Features", labelCol="Target")
model = lr.fit(trainDf)
predictions = model.transform(testDf)
evaluator = MulticlassClassificationEvaluator(labelCol="Target", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Model Accuracy: {accuracy}")

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1.0]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = crossval.fit(trainDf)
bestModel = cvModel.bestModel
predictions = bestModel.transform(testDf)
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Accuracy after Tuning: {accuracy}")



+----------+------+------+------+------+------+-------+
|      Date|  Open|  High|   Low| Close|Volume|OpenInt|
+----------+------+------+------+------+------+-------+
|2015-04-30|14.793|14.852|14.714|14.714| 39474|      0|
|2015-05-01|14.813|14.891|14.764|14.881|  8851|      0|
|2015-05-04| 14.96|15.029|14.891|14.921| 17296|      0|
|2015-05-05|14.911|14.931|14.685|14.704| 81695|      0|
|2015-05-06|14.695|14.695|14.557|14.596| 10175|      0|
+----------+------+------+------+------+------+-------+
only showing top 5 rows

Model Accuracy: 1.0
Best Model Accuracy after Tuning: 0.6666666666666666
