In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler

In [2]:
# Example: Linear Regression with Spark MLlib
from pyspark.ml.regression import LinearRegression

# Initialize Spark Session
spark = SparkSession.builder.appName('MLlib Example').getOrCreate()

# Load sample data
data = [(1, 5.0, 20.0), (2, 10.0, 25.0), (3, 15.0, 30.0), (4, 20.0, 35.0)]
columns = ['ID', 'Feature', 'Target']
df = spark.createDataFrame(data, columns)

# Prepare data for modeling
assembler = VectorAssembler(inputCols=['Feature'], outputCol='Features')
df_transformed = assembler.transform(df)

# Train a linear regression model
lr = LinearRegression(featuresCol='Features', labelCol='Target')
model = lr.fit(df_transformed)

# Print model coefficients
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

24/12/12 13:30:09 WARN Utils: Your hostname, codespaces-bb4c13 resolves to a loopback address: 127.0.0.1; using 10.0.0.237 instead (on interface eth0)
24/12/12 13:30:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/12 13:30:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/12/12 13:30:16 WARN Instrumentation: [50894ff4] regParam is zero, which might cause numerical instability and overfitting.
24/12/12 13:30:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/12 13:30:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/12/12 13:30:18 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


Coefficients: [0.9999999999999992]
Intercept: 15.000000000000009


24/12/12 13:30:23 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [3]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# Inisialisasi SparkSession
spark = SparkSession.builder.appName("LogisticRegressionExample").getOrCreate()

# Contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# mengubah kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

24/12/12 13:30:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Coefficients: [-12.262057891021882,4.087352253767593]
Intercept: 11.568912688492174


In [4]:
# Practice: Logistic Regression
from pyspark.ml.classification import LogisticRegression

# contoh dataset
data = [(1, 2.0, 3.0, 0), (2, 1.0, 5.0, 1), (3, 2.5, 4.5, 1), (4, 3.0, 6.0, 0)]
columns = ['ID', 'Feature1', 'Feature2', 'Label']
df = spark.createDataFrame(data, columns)

# menguhab kolom feature1 dan feature2 menjadi vector
assembler = VectorAssembler(inputCols=['Feature1', 'Feature2'], outputCol='Features')
df = assembler.transform(df)

# melatih model
lr = LogisticRegression(featuresCol='Features', labelCol='Label')
model = lr.fit(df)

# menampilkan hasil
print(f'Coefficients: {model.coefficients}')
print(f'Intercept: {model.intercept}')

Coefficients: [-12.262057891021882,4.087352253767593]
Intercept: 11.568912688492174


In [4]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Initialize SparkSession
spark = SparkSession.builder.appName("Homework").getOrCreate()

# Load data
dataPath = "1mm.csv"
df = spark.read.csv(dataPath, header=True, inferSchema=True)
df.show(5)

# Data preprocessing
df = df.withColumn("Target", (df["Rank"] > df["Interval"]).cast("int"))
df = df.select("Rank", "Num", "Interval", "Target")
print(f"Row count before dropping nulls: {df.count()}")
df = df.na.drop()
print(f"Row count after dropping nulls: {df.count()}")

# Feature engineering
featureCols = ["Rank", "Num", "Interval"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="Features")
df = assembler.transform(df)

# Train-test split
trainDf, testDf = df.randomSplit([0.8, 0.2], seed=42)

# Logistic regression model
lr = LogisticRegression(featuresCol="Features", labelCol="Target")
model = lr.fit(trainDf)
predictions = model.transform(testDf)

# Model evaluation
evaluator = BinaryClassificationEvaluator(labelCol="Target", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
accuracy = evaluator.evaluate(predictions)
print(f"Model Area Under ROC: {accuracy}")

# Hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
cvModel = crossval.fit(trainDf)
bestModel = cvModel.bestModel

# Evaluate best model
predictions = bestModel.transform(testDf)
accuracy = evaluator.evaluate(predictions)
print(f"Best Model Area Under ROC after Tuning: {accuracy}")


                                                                                

+----+----+--------+
|Rank| Num|Interval|
+----+----+--------+
|   1|   2|       2|
|   2|   3|    NULL|
|   3|   5|       2|
|   4|NULL|    NULL|
|   5|  11|    NULL|
+----+----+--------+
only showing top 5 rows

Row count before dropping nulls: 999999


                                                                                

Row count after dropping nulls: 999995


24/12/12 14:18:07 ERROR LBFGS: Failure! Resetting history: breeze.optimize.StepSizeUnderflow: 
24/12/12 14:18:08 ERROR LBFGS: Failure again! Giving up and returning. Maybe the objective is just poorly behaved?
                                                                                

Model Area Under ROC: 1.0


24/12/12 14:20:06 WARN Instrumentation: [c8fc8bb3] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:09 WARN Instrumentation: [377f5f9d] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:10 WARN Instrumentation: [fb973d76] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:11 WARN Instrumentation: [ca7b6fae] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:12 WARN Instrumentation: [cfa7625a] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:12 WARN Instrumentation: [9cefb489] All labels are the same value and fitIntercept=true, so the coefficients will be zeros. Training is not needed.
24/12/12 14:20:13 WARN

Best Model Area Under ROC after Tuning: 1.0
